cnki:采集所有库

main
zhaoxiangpeng 3 weeks ago
parent 7883a2d349
commit a97423e71f

@ -21,10 +21,32 @@ class ArticleItem(AddItemBase):
exported = scrapy.Field() exported = scrapy.Field()
class IdRelationItem(AddItemBase): class CnkiArticleItem(ArticleItem):
"""cnki文章item"""
__tablename__ = 'data_cnki_article'
third_id = scrapy.Field()
exported = scrapy.Field()
updated_at = scrapy.Field()
class CnkiIdRelationItem(AddItemBase):
__tablename__ = 'relation_school_cnki'
query_ids = scrapy.Field() query_ids = scrapy.Field()
school_ids = scrapy.Field() school_ids = scrapy.Field()
task_ids = scrapy.Field() task_ids = scrapy.Field()
year = scrapy.Field()
class CnkiArticleTodoIdItem(scrapy.Item):
__tablename__ = 'todo_ids_cnki'
third_id = scrapy.Field()
db_code = scrapy.Field()
state = scrapy.Field()
ti = scrapy.Field()
v = scrapy.Field()
class ArticleCitedItem(AddItemBase): class ArticleCitedItem(AddItemBase):

@ -24,12 +24,13 @@ from pymongo.errors import (
DuplicateKeyError, DuplicateKeyError,
BulkWriteError BulkWriteError
) )
from science_article_cnki.items import CnkiArticleTodoIdItem
from science_article_cnki.db_utils.mongo import MongoDBUtils, update_document, build_update_query from science_article_cnki.db_utils.mongo import MongoDBUtils, update_document, build_update_query
if TYPE_CHECKING: if TYPE_CHECKING:
from scrapy.crawler import Crawler from scrapy.crawler import Crawler
from scrapy.statscollectors import StatsCollector from scrapy.statscollectors import StatsCollector
from pymongo.collection import Collection
mongo_logger = logging.getLogger('pymongo') mongo_logger = logging.getLogger('pymongo')
mongo_logger.setLevel(logging.WARNING) mongo_logger.setLevel(logging.WARNING)
@ -57,8 +58,11 @@ class MongoPipeline(MongoDBUtils):
def process_item(self, item, spider): def process_item(self, item, spider):
# 确定Item类型 # 确定Item类型
if isinstance(item, CnkiArticleTodoIdItem):
return item
adapter = ItemAdapter(item) adapter = ItemAdapter(item)
item_type = self._get_item_type(item) item_type = self._get_item_table(item)
collection = self.db.get_collection(item_type) collection = self.db.get_collection(item_type)
d = adapter.asdict() d = adapter.asdict()
try: try:
@ -71,9 +75,13 @@ class MongoPipeline(MongoDBUtils):
key_value = write_error.get('keyValue') key_value = write_error.get('keyValue')
logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value) logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value)
d.pop("_id", None) d.pop("_id", None)
updated_at = d.pop('updated_at', None)
[d.pop(k, None) for k in key_pattern.keys()] [d.pop(k, None) for k in key_pattern.keys()]
update_q = build_update_query(d, replace=self.duplicate_cover_enable) update_q = build_update_query(d, replace=self.duplicate_cover_enable)
up_result = collection.update_one(filter=key_value, update=update_q, upsert=True) up_result = collection.update_one(filter=key_value, update=update_q, upsert=True)
if up_result.matched_count == up_result.modified_count == 1:
current_time = datetime.now()
collection.update_one(filter=key_value, update={"$set": {"updated_at": updated_at}})
self.stats.inc_value("item2db_updated/{}".format(item_type)) self.stats.inc_value("item2db_updated/{}".format(item_type))
except Exception: except Exception:
raise raise
@ -84,9 +92,63 @@ class MongoPipeline(MongoDBUtils):
self.client.close() self.client.close()
@staticmethod @staticmethod
def _get_item_type(item) -> str: def _get_item_table(item) -> str:
"""获取Item类型""" """获取Item类型"""
if hasattr(item, '__tablename__'): if hasattr(item, '__tablename__'):
return item.__class__.__tablename__ return item.__class__.__tablename__
return 'items_null_table' return 'items_null_table'
class DupTodoPipeline(MongoDBUtils):
def __init__(self, mongo_uri, mongo_db, stats: StatsCollector):
super().__init__(mongo_uri, mongo_db)
self.stats: StatsCollector = stats
@classmethod
def from_crawler(cls, crawler: Crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
stats=crawler.stats
)
def open_spider(self, spider):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self, item, spider):
if isinstance(item, CnkiArticleTodoIdItem):
fingerprints = item.get('third_id')
try:
if not self.is_exists(item, filter_key=self._get_dup_key(spider)):
table_name = self._get_item_table(spider)
coll = self.db.get_collection(table_name)
adapter = ItemAdapter(item)
d = adapter.asdict()
insert_result = coll.insert_one(d)
self.stats.inc_value("item2db_inserted/{}".format(table_name), count=1)
except DuplicateKeyError as duplicate_error:
logger.warning(duplicate_error)
except Exception as e:
raise e
return item
def is_exists(self, item, filter_key) -> bool:
fingerprints = item.get('third_id')
collection: Collection = self.db.get_collection(filter_key)
results = collection.find_one(filter={"third_id": fingerprints}, projection={"_id": 0, "third_id": 1})
if results and results.get('third_id') == fingerprints:
self.inc_item_dropped_count("duplicate")
return True
return False
def _get_dup_key(self, spider):
return 'data_%(source_type)s_article' % {"source_type": spider.source}
def _get_item_table(self, spider) -> str:
"""获取Item类型"""
return 'todo_ids_%(source_type)s' % {"source_type": spider.source}
def inc_item_dropped_count(self, reason):
self.stats.inc_value("item_dropped_count")
self.stats.inc_value(f"item_dropped_reasons_count/{reason}")

@ -0,0 +1,183 @@
from __future__ import annotations
import math
from copy import deepcopy
from datetime import datetime
from typing import TYPE_CHECKING, Any, Self
from pprint import pformat
import scrapy
from science_article_cnki.items import CnkiIdRelationItem, CnkiArticleTodoIdItem, CnkiCitedNumberItem
from science_article_cnki.models.enum_cls import SingleResultEnum
from science_article_cnki.models import cnki_model as model
from science_article_cnki.utils import tools
from science_article_cnki.utils.tools import parse_datetime, add_year2item
from science_article_cnki.utils.ti_match_id import ti2format, ti2unique_type2
from science_article_cnki.configs import cnki as config
class CnkiArticleCrossdbSpider(scrapy.Spider):
name = "cnki_article_crossdb"
custom_settings = dict(
DOWNLOADER_MIDDLEWARES={
"science_article_cnki.middlewares.CnkiSearchHeadersDownloaderMiddleware": 540,
},
ITEM_PIPELINES={
"science_article_cnki.pipelines.MongoPipeline": 300,
"science_article_cnki.pipelines.DupTodoPipeline": 310,
# "science_article_cnki.pipelines.verify_data.VerifyDataIntegrity": 400,
},
# LOG_LEVEL="INFO"
)
source = 'cnki'
resource_type: str = "总库"
query_id: int
query: str
filters: list = list()
def open_spider(self):
"""
"""
pass
async def start(self):
m = dict(query=self.query, resource_type=self.resource_type, page=1)
m.update(filters=self.filters)
query_body = model.adv_refine_search(**m)
# 把筛选项加到查询体中
model.add_muti_filters(base_query=query_body, filters=m.get("filters"))
form_d = model.adv_query_search(query_body, **m)
yield scrapy.FormRequest(url=config.CNKI_ADV_SEARCH_API, method="POST",
formdata=form_d, meta=dict(REQUEST_Q=m))
def parse(self, response, **kwargs):
"""
首次请求会进入这个解析
"""
request_q = response.meta["REQUEST_Q"]
msg = """当前检索: %(query)s,\n筛选项: %(filters)s,\n页数: %(page)s"""
kws = {
"query": request_q.get("query"),
"filters": pformat(request_q.get("filters", [])),
"page": '{c}/{m}'.format(c=request_q.get("page", 1), m=request_q.get("max_page", 'null'))
}
self.logger.info(msg % kws)
# -------------------------------------------- 计算一共有多少页的逻辑 --------------------------------------------
# 提取检索结果的数量
total_prm = response.xpath('//span[@class="pagerTitleCell"]/em/text()').get()
if not total_prm:
return
total = tools.str2int(total_prm.replace(',', '')) # 格式化数量字符串并转int
# 计算一共有多少页
max_page = math.ceil(total / config.BATCH_SEARCH_RESULT_LIMIT)
request_q['max_page'] = max_page
batch_time = datetime.now()
# ---------------------------------------------- 提取列表文章的逻辑 ----------------------------------------------
tr_nodes = response.xpath('//div[@id="gridTable"]//table[@class="result-table-list"]/tbody/tr')
for tr_node in tr_nodes:
article_title = tr_node.xpath('./td[@class="name"]/a//text()').getall() # 文章标题
article_title = article_title and ''.join(article_title)
article_link = tr_node.xpath('./td[@class="name"]/a/@href').get() # 文章链接有v值
source_title = tr_node.xpath('./td[@class="source"]/*/a/text()').get() # 出版物名称(刊名)
db_name = tr_node.xpath('./td[@class="operat"]/a[@class="icon-collect"]/@data-dbname').get() # 收录库
third_id = tr_node.xpath('./td[@class="operat"]/a[@class="icon-collect"]/@data-filename').get() # 三方id
cited_str = tr_node.xpath('./td[@class="quote"]/span/a/text()').get() # 被引量字符串
param = tools.url_parse(article_link)
v = param.get('v')
ti_format = ti2format(article_title)
ti_unique = ti2unique_type2(ti=ti_format, so=source_title)
if third_id:
relation_item = CnkiIdRelationItem()
relation_item['third_id'] = third_id
relation_item['query_ids'] = [self.query_id]
# 给关系添加年份
add_year2item(relation_item, request_q.get("year"), tr_node.xpath('./td[@class="date"]/text()').get())
relation_item['updated_at'] = batch_time
yield relation_item
if cited_str:
cited_item = CnkiCitedNumberItem(**dict(third_id=third_id, cited=tools.str2int(cited_str, 0), updated_at=batch_time))
yield cited_item
yield CnkiArticleTodoIdItem(**dict(third_id=third_id, db_code=db_name, ti=ti_unique, v=v, state=0))
q_bak: dict = deepcopy(request_q)
q_bak['page'] += 1
query_body = model.adv_refine_search(**q_bak)
model.add_muti_filters(base_query=query_body, filters=q_bak.get("filters"))
search_param = model.adv_query_search(query_body, **q_bak)
yield scrapy.FormRequest(
url=config.CNKI_ADV_SEARCH_API, method="POST",
formdata=search_param,
callback=self.parse_other_page,
meta=dict(REQUEST_Q=q_bak)
)
async def parse_other_page(self, response, **kwargs):
priority = response.request.priority
request_q = response.meta["REQUEST_Q"]
msg = """当前检索: %(query)s,\n筛选项: %(filters)s,\n页数: %(page)s"""
kws = {
"query": request_q.get("query"),
"filters": pformat(request_q.get("filters", [])),
"page": '{c}/{m}'.format(c=request_q.get("page", 1), m=request_q.get("max_page", 'null'))
}
self.logger.info(msg % kws)
batch_time = datetime.now()
# ---------------------------------------------- 提取列表文章的逻辑 ----------------------------------------------
tr_nodes = response.xpath('//div[@id="gridTable"]//table[@class="result-table-list"]/tbody/tr')
for tr_node in tr_nodes:
article_title = tr_node.xpath('./td[@class="name"]/a/text()').get() # 文章标题
article_link = tr_node.xpath('./td[@class="name"]/a/@href').get() # 文章链接有v值
source_title = tr_node.xpath('./td[@class="source"]/*/a/text()').get() # 出版物名称(刊名)
db_name = tr_node.xpath('./td[@class="operat"]/a[@class="icon-collect"]/@data-dbname').get() # 收录库
third_id = tr_node.xpath('./td[@class="operat"]/a[@class="icon-collect"]/@data-filename').get() # 三方id
cited_str = tr_node.xpath('./td[@class="quote"]/span/a/text()').get() # 被引量字符串
param = tools.url_parse(article_link)
v = param.get('v')
ti_format = ti2format(article_title)
ti_unique = ti2unique_type2(ti=ti_format, so=source_title)
if third_id:
relation_item = CnkiIdRelationItem()
relation_item['third_id'] = third_id
relation_item['query_ids'] = [self.query_id]
# 给关系添加年份
add_year2item(relation_item, request_q.get("year"), tr_node.xpath('./td[@class="date"]/text()').get())
relation_item['updated_at'] = batch_time
yield relation_item
if cited_str:
cited_item = CnkiCitedNumberItem(**dict(third_id=third_id, cited=tools.str2int(cited_str, 0), updated_at=batch_time))
yield cited_item
yield CnkiArticleTodoIdItem(**dict(third_id=third_id, db_code=db_name, ti=ti_unique, v=v, state=0))
"""
# -------------------------------------------------- 翻页逻辑 --------------------------------------------------
"""
if request_q['page'] < request_q['max_page']:
q_bak = deepcopy(request_q)
"""
2023年6月29日14:56:44 处理倒序逻辑
cnki单次检索限制6000条即6000/50=120当6000<数量<12000可以使用倒序来进行补充
"""
# 限制6000条的逻辑
if q_bak['page'] >= 120 and q_bak.get('sort') != 'asc':
q_bak['page'] = 0
q_bak['sort'] = 'asc'
q_bak['max_page_sum'] = q_bak['max_page']
q_bak['max_page'] = q_bak['max_page_sum'] - 120 + 2
# 倒序处理逻辑结束
q_bak['page'] += 1
query_body = model.adv_refine_search(**q_bak)
model.add_muti_filters(base_query=query_body, filters=q_bak.get("filters"))
search_param = model.adv_query_search(query_body, **q_bak)
yield scrapy.FormRequest(
url=config.CNKI_ADV_SEARCH_API, method="POST",
formdata=search_param, priority=priority,
callback=self.parse_other_page,
meta=dict(REQUEST_Q=q_bak)
)

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/12 14:13
# @Author : zhaoxiangpeng
# @File : crawl_crossdb_article.py
from twisted.internet import defer
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from science_article_cnki.spiders.cnki_article_crossdb import CnkiArticleCrossdbSpider
def starter_by_year():
@defer.inlineCallbacks
def f(range_list: list = None):
for y in range_list:
init_params = {
'query_id': 1609,
'query': '(作者单位:河北工程技术学院(模糊)',
# 'query_condition': {'year': str(y)},
'filters': [
dict(project="年度", value=f"{y}", text_or_title=f"{y}"),
]
}
yield process.crawl(CnkiArticleCrossdbSpider, **init_params)
process = CrawlerProcess(get_project_settings())
f(list(range(2021, 2022)))
process.start()
def starter_more_year():
@defer.inlineCallbacks
def f(years: list = None):
init_params = {
'query_id': 1611,
'query': '(作者单位:武昌首义学院(模糊)',
'filters': [
dict(project="年度", value=[f"{y}" for y in years], text_or_title=[f"{y}" for y in years]),
]
}
yield process.crawl(CnkiArticleCrossdbSpider, **init_params)
process = CrawlerProcess(get_project_settings())
f(list(range(2021, 2026)))
process.start()
def starter():
process = CrawlerProcess(get_project_settings())
process.crawl(CnkiArticleCrossdbSpider)
process.start()
if __name__ == '__main__':
starter_more_year()
Loading…
Cancel
Save