From a97423e71f55606f0451bd305304810fdf34982b Mon Sep 17 00:00:00 2001 From: zhaoxiangpeng <1943364377@qq.com> Date: Tue, 13 Jan 2026 16:20:07 +0800 Subject: [PATCH] =?UTF-8?q?cnki:=E9=87=87=E9=9B=86=E6=89=80=E6=9C=89?= =?UTF-8?q?=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../science_article_cnki/items.py | 24 ++- .../science_article_cnki/pipelines.py | 70 ++++++- .../spiders/cnki_article_crossdb.py | 183 ++++++++++++++++++ .../starter/crawl_article_crossdb.py | 54 ++++++ 4 files changed, 326 insertions(+), 5 deletions(-) create mode 100644 science_article_cnki/science_article_cnki/spiders/cnki_article_crossdb.py create mode 100644 science_article_cnki/starter/crawl_article_crossdb.py diff --git a/science_article_cnki/science_article_cnki/items.py b/science_article_cnki/science_article_cnki/items.py index 49c7457..12e8d4e 100644 --- a/science_article_cnki/science_article_cnki/items.py +++ b/science_article_cnki/science_article_cnki/items.py @@ -21,10 +21,32 @@ class ArticleItem(AddItemBase): exported = scrapy.Field() -class IdRelationItem(AddItemBase): +class CnkiArticleItem(ArticleItem): + """cnki文章item""" + __tablename__ = 'data_cnki_article' + + third_id = scrapy.Field() + exported = scrapy.Field() + updated_at = scrapy.Field() + + +class CnkiIdRelationItem(AddItemBase): + __tablename__ = 'relation_school_cnki' + query_ids = scrapy.Field() school_ids = scrapy.Field() task_ids = scrapy.Field() + year = scrapy.Field() + + +class CnkiArticleTodoIdItem(scrapy.Item): + __tablename__ = 'todo_ids_cnki' + + third_id = scrapy.Field() + db_code = scrapy.Field() + state = scrapy.Field() + ti = scrapy.Field() + v = scrapy.Field() class ArticleCitedItem(AddItemBase): diff --git a/science_article_cnki/science_article_cnki/pipelines.py b/science_article_cnki/science_article_cnki/pipelines.py index dc9a93a..c987160 100644 --- a/science_article_cnki/science_article_cnki/pipelines.py +++ b/science_article_cnki/science_article_cnki/pipelines.py @@ -24,12 +24,13 @@ from pymongo.errors import ( DuplicateKeyError, BulkWriteError ) - +from science_article_cnki.items import CnkiArticleTodoIdItem from science_article_cnki.db_utils.mongo import MongoDBUtils, update_document, build_update_query if TYPE_CHECKING: from scrapy.crawler import Crawler from scrapy.statscollectors import StatsCollector + from pymongo.collection import Collection mongo_logger = logging.getLogger('pymongo') mongo_logger.setLevel(logging.WARNING) @@ -57,8 +58,11 @@ class MongoPipeline(MongoDBUtils): def process_item(self, item, spider): # 确定Item类型 + if isinstance(item, CnkiArticleTodoIdItem): + return item + adapter = ItemAdapter(item) - item_type = self._get_item_type(item) + item_type = self._get_item_table(item) collection = self.db.get_collection(item_type) d = adapter.asdict() try: @@ -71,10 +75,14 @@ class MongoPipeline(MongoDBUtils): key_value = write_error.get('keyValue') logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value) d.pop("_id", None) + updated_at = d.pop('updated_at', None) [d.pop(k, None) for k in key_pattern.keys()] update_q = build_update_query(d, replace=self.duplicate_cover_enable) up_result = collection.update_one(filter=key_value, update=update_q, upsert=True) - self.stats.inc_value("item2db_updated/{}".format(item_type)) + if up_result.matched_count == up_result.modified_count == 1: + current_time = datetime.now() + collection.update_one(filter=key_value, update={"$set": {"updated_at": updated_at}}) + self.stats.inc_value("item2db_updated/{}".format(item_type)) except Exception: raise @@ -84,9 +92,63 @@ class MongoPipeline(MongoDBUtils): self.client.close() @staticmethod - def _get_item_type(item) -> str: + def _get_item_table(item) -> str: """获取Item类型""" if hasattr(item, '__tablename__'): return item.__class__.__tablename__ return 'items_null_table' + +class DupTodoPipeline(MongoDBUtils): + def __init__(self, mongo_uri, mongo_db, stats: StatsCollector): + super().__init__(mongo_uri, mongo_db) + self.stats: StatsCollector = stats + + @classmethod + def from_crawler(cls, crawler: Crawler): + return cls( + mongo_uri=crawler.settings.get("MONGO_URI"), + mongo_db=crawler.settings.get("MONGO_DATABASE", "items"), + stats=crawler.stats + ) + + def open_spider(self, spider): + self.client = MongoClient(self.mongo_uri) + self.db = self.client[self.mongo_db] + + def process_item(self, item, spider): + if isinstance(item, CnkiArticleTodoIdItem): + fingerprints = item.get('third_id') + try: + if not self.is_exists(item, filter_key=self._get_dup_key(spider)): + table_name = self._get_item_table(spider) + coll = self.db.get_collection(table_name) + adapter = ItemAdapter(item) + d = adapter.asdict() + insert_result = coll.insert_one(d) + self.stats.inc_value("item2db_inserted/{}".format(table_name), count=1) + except DuplicateKeyError as duplicate_error: + logger.warning(duplicate_error) + except Exception as e: + raise e + return item + + def is_exists(self, item, filter_key) -> bool: + fingerprints = item.get('third_id') + collection: Collection = self.db.get_collection(filter_key) + results = collection.find_one(filter={"third_id": fingerprints}, projection={"_id": 0, "third_id": 1}) + if results and results.get('third_id') == fingerprints: + self.inc_item_dropped_count("duplicate") + return True + return False + + def _get_dup_key(self, spider): + return 'data_%(source_type)s_article' % {"source_type": spider.source} + + def _get_item_table(self, spider) -> str: + """获取Item类型""" + return 'todo_ids_%(source_type)s' % {"source_type": spider.source} + + def inc_item_dropped_count(self, reason): + self.stats.inc_value("item_dropped_count") + self.stats.inc_value(f"item_dropped_reasons_count/{reason}") diff --git a/science_article_cnki/science_article_cnki/spiders/cnki_article_crossdb.py b/science_article_cnki/science_article_cnki/spiders/cnki_article_crossdb.py new file mode 100644 index 0000000..97bda17 --- /dev/null +++ b/science_article_cnki/science_article_cnki/spiders/cnki_article_crossdb.py @@ -0,0 +1,183 @@ +from __future__ import annotations +import math +from copy import deepcopy +from datetime import datetime +from typing import TYPE_CHECKING, Any, Self +from pprint import pformat +import scrapy + +from science_article_cnki.items import CnkiIdRelationItem, CnkiArticleTodoIdItem, CnkiCitedNumberItem +from science_article_cnki.models.enum_cls import SingleResultEnum +from science_article_cnki.models import cnki_model as model +from science_article_cnki.utils import tools +from science_article_cnki.utils.tools import parse_datetime, add_year2item +from science_article_cnki.utils.ti_match_id import ti2format, ti2unique_type2 +from science_article_cnki.configs import cnki as config + + +class CnkiArticleCrossdbSpider(scrapy.Spider): + name = "cnki_article_crossdb" + custom_settings = dict( + DOWNLOADER_MIDDLEWARES={ + "science_article_cnki.middlewares.CnkiSearchHeadersDownloaderMiddleware": 540, + }, + ITEM_PIPELINES={ + "science_article_cnki.pipelines.MongoPipeline": 300, + "science_article_cnki.pipelines.DupTodoPipeline": 310, + # "science_article_cnki.pipelines.verify_data.VerifyDataIntegrity": 400, + }, + # LOG_LEVEL="INFO" + ) + source = 'cnki' + + resource_type: str = "总库" + + query_id: int + query: str + filters: list = list() + + def open_spider(self): + """ + """ + pass + + async def start(self): + m = dict(query=self.query, resource_type=self.resource_type, page=1) + m.update(filters=self.filters) + query_body = model.adv_refine_search(**m) + # 把筛选项加到查询体中 + model.add_muti_filters(base_query=query_body, filters=m.get("filters")) + form_d = model.adv_query_search(query_body, **m) + yield scrapy.FormRequest(url=config.CNKI_ADV_SEARCH_API, method="POST", + formdata=form_d, meta=dict(REQUEST_Q=m)) + + def parse(self, response, **kwargs): + """ + 首次请求会进入这个解析 + """ + request_q = response.meta["REQUEST_Q"] + msg = """当前检索: %(query)s,\n筛选项: %(filters)s,\n页数: %(page)s""" + kws = { + "query": request_q.get("query"), + "filters": pformat(request_q.get("filters", [])), + "page": '{c}/{m}'.format(c=request_q.get("page", 1), m=request_q.get("max_page", 'null')) + } + self.logger.info(msg % kws) + + # -------------------------------------------- 计算一共有多少页的逻辑 -------------------------------------------- + # 提取检索结果的数量 + total_prm = response.xpath('//span[@class="pagerTitleCell"]/em/text()').get() + if not total_prm: + return + total = tools.str2int(total_prm.replace(',', '')) # 格式化数量字符串并转int + + # 计算一共有多少页 + max_page = math.ceil(total / config.BATCH_SEARCH_RESULT_LIMIT) + request_q['max_page'] = max_page + batch_time = datetime.now() + # ---------------------------------------------- 提取列表文章的逻辑 ---------------------------------------------- + tr_nodes = response.xpath('//div[@id="gridTable"]//table[@class="result-table-list"]/tbody/tr') + for tr_node in tr_nodes: + article_title = tr_node.xpath('./td[@class="name"]/a//text()').getall() # 文章标题 + article_title = article_title and ''.join(article_title) + article_link = tr_node.xpath('./td[@class="name"]/a/@href').get() # 文章链接(有v值) + source_title = tr_node.xpath('./td[@class="source"]/*/a/text()').get() # 出版物名称(刊名) + db_name = tr_node.xpath('./td[@class="operat"]/a[@class="icon-collect"]/@data-dbname').get() # 收录库 + third_id = tr_node.xpath('./td[@class="operat"]/a[@class="icon-collect"]/@data-filename').get() # 三方id + cited_str = tr_node.xpath('./td[@class="quote"]/span/a/text()').get() # 被引量字符串 + + param = tools.url_parse(article_link) + v = param.get('v') + ti_format = ti2format(article_title) + ti_unique = ti2unique_type2(ti=ti_format, so=source_title) + + if third_id: + relation_item = CnkiIdRelationItem() + relation_item['third_id'] = third_id + relation_item['query_ids'] = [self.query_id] + # 给关系添加年份 + add_year2item(relation_item, request_q.get("year"), tr_node.xpath('./td[@class="date"]/text()').get()) + relation_item['updated_at'] = batch_time + yield relation_item + + if cited_str: + cited_item = CnkiCitedNumberItem(**dict(third_id=third_id, cited=tools.str2int(cited_str, 0), updated_at=batch_time)) + yield cited_item + yield CnkiArticleTodoIdItem(**dict(third_id=third_id, db_code=db_name, ti=ti_unique, v=v, state=0)) + + q_bak: dict = deepcopy(request_q) + q_bak['page'] += 1 + query_body = model.adv_refine_search(**q_bak) + model.add_muti_filters(base_query=query_body, filters=q_bak.get("filters")) + search_param = model.adv_query_search(query_body, **q_bak) + yield scrapy.FormRequest( + url=config.CNKI_ADV_SEARCH_API, method="POST", + formdata=search_param, + callback=self.parse_other_page, + meta=dict(REQUEST_Q=q_bak) + ) + + async def parse_other_page(self, response, **kwargs): + priority = response.request.priority + request_q = response.meta["REQUEST_Q"] + msg = """当前检索: %(query)s,\n筛选项: %(filters)s,\n页数: %(page)s""" + kws = { + "query": request_q.get("query"), + "filters": pformat(request_q.get("filters", [])), + "page": '{c}/{m}'.format(c=request_q.get("page", 1), m=request_q.get("max_page", 'null')) + } + self.logger.info(msg % kws) + batch_time = datetime.now() + # ---------------------------------------------- 提取列表文章的逻辑 ---------------------------------------------- + tr_nodes = response.xpath('//div[@id="gridTable"]//table[@class="result-table-list"]/tbody/tr') + for tr_node in tr_nodes: + article_title = tr_node.xpath('./td[@class="name"]/a/text()').get() # 文章标题 + article_link = tr_node.xpath('./td[@class="name"]/a/@href').get() # 文章链接(有v值) + source_title = tr_node.xpath('./td[@class="source"]/*/a/text()').get() # 出版物名称(刊名) + db_name = tr_node.xpath('./td[@class="operat"]/a[@class="icon-collect"]/@data-dbname').get() # 收录库 + third_id = tr_node.xpath('./td[@class="operat"]/a[@class="icon-collect"]/@data-filename').get() # 三方id + cited_str = tr_node.xpath('./td[@class="quote"]/span/a/text()').get() # 被引量字符串 + + param = tools.url_parse(article_link) + v = param.get('v') + ti_format = ti2format(article_title) + ti_unique = ti2unique_type2(ti=ti_format, so=source_title) + if third_id: + relation_item = CnkiIdRelationItem() + relation_item['third_id'] = third_id + relation_item['query_ids'] = [self.query_id] + # 给关系添加年份 + add_year2item(relation_item, request_q.get("year"), tr_node.xpath('./td[@class="date"]/text()').get()) + relation_item['updated_at'] = batch_time + yield relation_item + if cited_str: + cited_item = CnkiCitedNumberItem(**dict(third_id=third_id, cited=tools.str2int(cited_str, 0), updated_at=batch_time)) + yield cited_item + yield CnkiArticleTodoIdItem(**dict(third_id=third_id, db_code=db_name, ti=ti_unique, v=v, state=0)) + + """ + # -------------------------------------------------- 翻页逻辑 -------------------------------------------------- + """ + if request_q['page'] < request_q['max_page']: + q_bak = deepcopy(request_q) + """ + 2023年6月29日14:56:44 处理倒序逻辑 + cnki单次检索限制6000条,即6000/50=120页,当6000<数量<12000时,可以使用倒序来进行补充 + """ + # 限制6000条的逻辑 + if q_bak['page'] >= 120 and q_bak.get('sort') != 'asc': + q_bak['page'] = 0 + q_bak['sort'] = 'asc' + q_bak['max_page_sum'] = q_bak['max_page'] + q_bak['max_page'] = q_bak['max_page_sum'] - 120 + 2 + # 倒序处理逻辑结束 + q_bak['page'] += 1 + query_body = model.adv_refine_search(**q_bak) + model.add_muti_filters(base_query=query_body, filters=q_bak.get("filters")) + search_param = model.adv_query_search(query_body, **q_bak) + yield scrapy.FormRequest( + url=config.CNKI_ADV_SEARCH_API, method="POST", + formdata=search_param, priority=priority, + callback=self.parse_other_page, + meta=dict(REQUEST_Q=q_bak) + ) diff --git a/science_article_cnki/starter/crawl_article_crossdb.py b/science_article_cnki/starter/crawl_article_crossdb.py new file mode 100644 index 0000000..8b4990a --- /dev/null +++ b/science_article_cnki/starter/crawl_article_crossdb.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# @Time : 2026/1/12 14:13 +# @Author : zhaoxiangpeng +# @File : crawl_crossdb_article.py +from twisted.internet import defer +from scrapy.crawler import CrawlerProcess +from scrapy.utils.project import get_project_settings +from science_article_cnki.spiders.cnki_article_crossdb import CnkiArticleCrossdbSpider + + +def starter_by_year(): + @defer.inlineCallbacks + def f(range_list: list = None): + for y in range_list: + init_params = { + 'query_id': 1609, + 'query': '(作者单位:河北工程技术学院(模糊))', + # 'query_condition': {'year': str(y)}, + 'filters': [ + dict(project="年度", value=f"{y}", text_or_title=f"{y}年"), + ] + } + yield process.crawl(CnkiArticleCrossdbSpider, **init_params) + + process = CrawlerProcess(get_project_settings()) + f(list(range(2021, 2022))) + process.start() + + +def starter_more_year(): + @defer.inlineCallbacks + def f(years: list = None): + init_params = { + 'query_id': 1611, + 'query': '(作者单位:武昌首义学院(模糊))', + 'filters': [ + dict(project="年度", value=[f"{y}" for y in years], text_or_title=[f"{y}年" for y in years]), + ] + } + yield process.crawl(CnkiArticleCrossdbSpider, **init_params) + + process = CrawlerProcess(get_project_settings()) + f(list(range(2021, 2026))) + process.start() + + +def starter(): + process = CrawlerProcess(get_project_settings()) + process.crawl(CnkiArticleCrossdbSpider) + process.start() + + +if __name__ == '__main__': + starter_more_year()