Compare commits

..

8 Commits

Author SHA1 Message Date
zhaoxiangpeng cf20521b1c cnki:重复项更新 2 months ago
zhaoxiangpeng 3507ba07ae cnki:来源类型打标签 2 months ago
zhaoxiangpeng bedba6c83f cnki:单一来源(左侧导航)api 2 months ago
zhaoxiangpeng 7920eaebbc cnki:来源类型item 2 months ago
zhaoxiangpeng aa07aa05f1 cnki:修改枚举类的使用,增加单一结果搜索的model 2 months ago
zhaoxiangpeng be61990806 cnki:增加两个枚举类 2 months ago
zhaoxiangpeng 229f2f49f9 cnki:采集被引量入口 2 months ago
zhaoxiangpeng ad54448faf cnki:headers中间件处理cookie字符串 2 months ago

@ -31,6 +31,9 @@ CNKI_ARTICLE_DETAIL = 'https://kns.cnki.net/kcms/detail/detail.aspx?dbcode={db_c
# -- 旧版的接口
CNKI_ADV_SEARCH_API = 'https://kns.cnki.net/kns8s/brief/grid'
# 单一来源(左侧导航)
SIGNAL_RESULT_API = "https://kns.cnki.net/kns8s/group/singleresult?language=CHS&uniplatform=NZKPT"
# 搜索用的请求头
SEARCH_HEADERS = {
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
@ -86,3 +89,5 @@ TABLE_HEAD = ['SrcDatabase-来源库', 'Title-题名', 'Author-作者', 'Organ-
TABLE_HEAD_EN = ['src_db', 'title', 'author', 'org', 'journal', 'keyword', 'abstract', 'pub_time', 'first_duty', 'fund', 'year', 'volum', 'issue', 'page', 'classification_code', 'issn', 'url', 'doi']
# 每次下载数量
BATCH_DOWNLOAD_LIMIT = 50
# 每页搜索结果的数量
BATCH_SEARCH_RESULT_LIMIT = 50

@ -38,3 +38,9 @@ class CnkiCitedNumberItem(ArticleCitedItem):
third_id = scrapy.Field()
cited = scrapy.Field()
updated_at = scrapy.Field()
class CnkiArticeSourceItem(scrapy.Item):
__tablename__ = 'relation_sourcetype_cnki'
third_id = scrapy.Field()
source_types = scrapy.Field()

@ -104,13 +104,25 @@ from scrapy.http.headers import Headers
class CnkiSearchHeadersDownloaderMiddleware:
def __init__(self, custom_headers: dict):
def __init__(self, custom_headers: dict, cookies_str: str):
self.custom_headers = custom_headers
self.custom_cookies = self._parse_cookies_str(cookies_str)
@classmethod
def from_crawler(cls, crawler):
return cls(custom_headers=crawler.settings['SEARCH_REQUEST_HEADERS'])
return cls(
custom_headers=crawler.settings['SEARCH_REQUEST_HEADERS'],
cookies_str=crawler.settings['SEARCH_REQUEST_COOKIES_STR']
)
def _parse_cookies_str(self, cookies_str):
cookies = {}
for cookie in cookies_str.split(';'):
key, value = cookie.split('=', 1)
cookies[key.strip()] = value.strip()
return cookies
def process_request(self, request, spider):
request.headers = Headers(self.custom_headers)
request.cookies = self.custom_cookies
return None

@ -7,7 +7,11 @@ import json
from typing import Union, List, Dict
from datetime import datetime, timedelta
from science_article_cnki.models.enum_cls import ResourceType, SearchTypeId, SearchFieldEnum, OperatorEnum, LogicEnum, SearchFromId
from science_article_cnki.models.enum_cls import (
ResourceType, SourceDatabaseEnum,
SearchTypeId, SearchFieldEnum, OperatorEnum, LogicEnum,
SingleResultEnum
)
DB_CODE = {
'CFLS': '总库',
@ -53,7 +57,7 @@ def navigator_body(query: str = None, db_code: str = 'CFLS', **kwargs):
return _param
def signal_body(query: str = None, resource_type: str = 'JOURNAL', group_id: str = 'YE', **kwargs):
def signal_body(query: str = None, resource_type: str = '学术期刊', group_id: str = 'YE', **kwargs):
"""
获取左侧导航栏单类目的聚合
:group_id: 主要主题1; 学科2; 发表年度3; 研究层次4; 文献类型5; 文献来源6; 作者7; 机构8; 基金9
@ -65,8 +69,8 @@ def signal_body(query: str = None, resource_type: str = 'JOURNAL', group_id: str
_param = {
'queryJson': json.dumps({
"Platform": "",
"Resource": ResourceType[resource_type].name,
"Classid": ResourceType[resource_type].value,
"Resource": ResourceType[resource_type].value,
"Classid": SourceDatabaseEnum[resource_type].value,
"Products": "",
"QNode": {
"QGroup": [{
@ -101,14 +105,14 @@ def signal_body(query: str = None, resource_type: str = 'JOURNAL', group_id: str
return _param
def refine_search(query: str, resource_type: str = 'JOURNAL', year=None, subject=None, code=None, **kwargs):
def refine_search(query: str, resource_type: str = '学术期刊', year=None, subject=None, code=None, **kwargs):
"""
使用专业检索式检索后再次检索年份
"""
_query = {
"Platform": "",
"Resource": ResourceType[resource_type].name,
"Classid": ResourceType[resource_type].value,
"Resource": ResourceType[resource_type].value,
"Classid": SourceDatabaseEnum[resource_type].value,
"Products": "",
"QNode": {
"QGroup": [{
@ -400,6 +404,52 @@ def add_search_word(search_content: str, base_query: dict = None):
return words_query
def add_muti_group(
project: Union[SingleResultEnum, str],
value: str,
text_or_title: str,
base_query: dict = None,
**kwargs) -> dict:
"""
添加筛选项
Key <SingleResultEnum>: 筛选项标记
Items[].Title <>: 筛选项的名称 input/@title
Items[].Name <>: 筛选项的名称 input/@title
Items[].Key <>: 筛选项的代码 input/@value
Items[].Value <>: 筛选项的代码与Items.Key一致 input/@value
Items[].Value2 <>: 空字符串
Items[].Field <>: 筛选项标记同组的Key一致
"""
if isinstance(project, SingleResultEnum):
project = project.value
elif isinstance(project, str):
project = SingleResultEnum[project].value
child_item = {
"Key": project,
"Title": "",
"Logic": 0,
"Items": [
{
"Key": value,
"Title": text_or_title,
"Logic": 1,
"Field": project,
"Operator": "DEFAULT",
"Value": value,
"Value2": "",
"Name": project,
"ExtendType": 0
}],
"ChildItems": [
]
}
if base_query:
add_limit_2query_body(child_item, "MutiGroup", base_query)
return child_item
def limit_year_range(year: int, base_query: dict = None):
"""
添加年份筛选
@ -449,7 +499,7 @@ def parse_updatedtime_symbol(symbol: str, today: str = None) -> tuple:
elif symbol == "最近半年":
ago_day = today - timedelta(days=181)
elif symbol == "最近一年":
ago_day = today.replace(year=today.year-1)
ago_day = today.replace(year=today.year - 1)
elif symbol == "今年迄今":
ago_day = today.replace(month=1, day=1)
else:
@ -494,7 +544,7 @@ def temp_refine_search(
query: str,
year: int = None,
updated_date: str = None,
resource_type: str = 'JOURNAL',
resource_type: str = '学术期刊',
**kwargs
):
"""
@ -508,8 +558,8 @@ def temp_refine_search(
"""
_query = {
"Platform": "",
"Resource": ResourceType[resource_type].name,
"Classid": ResourceType[resource_type].value,
"Resource": ResourceType[resource_type].value,
"Classid": SourceDatabaseEnum[resource_type].value,
"Products": "",
"QNode": {
"QGroup": [
@ -567,7 +617,8 @@ def temp_query_search(query_body, query: str = None, page: int = 1, page_size: i
"dstyle": "listmode",
"boolSortSearch": "false",
"aside": aside,
"searchFrom": "资源范围:学术期刊; 仅看有全文,中英文扩展; 时间范围:更新时间:%(updated_date)s; 来源类别:全部期刊; " % {"updated_date": updated_date},
"searchFrom": "资源范围:学术期刊; 仅看有全文,中英文扩展; 时间范围:更新时间:%(updated_date)s; 来源类别:全部期刊; " % {
"updated_date": updated_date},
"subject": "",
"language": "",
"uniplatform": "",
@ -584,7 +635,8 @@ def temp_query_search(query_body, query: str = None, page: int = 1, page_size: i
'dstyle': 'listmode',
'boolSortSearch': "false",
'aside': '',
'searchFrom': '资源范围:学术期刊; 时间范围:更新时间:%(updated_date)s; 来源类别:全部期刊; ' % {"updated_date": updated_date},
'searchFrom': '资源范围:学术期刊; 时间范围:更新时间:%(updated_date)s; 来源类别:全部期刊; ' % {
"updated_date": updated_date},
"subject": "",
"language": "",
"uniplatform": ""
@ -595,6 +647,20 @@ def temp_query_search(query_body, query: str = None, page: int = 1, page_size: i
adv_query_search = temp_query_search
def single_result_nav(
queryJson: Union[dict, str],
groupId: Union[str, SingleResultEnum]
) -> Dict[str, str]:
if isinstance(queryJson, dict):
queryJson = json.dumps(queryJson, ensure_ascii=False)
if isinstance(groupId, SingleResultEnum):
groupId = groupId.value
return dict(
queryJson=queryJson,
groupId=groupId,
)
class SearchPaperArgModel:
pass

@ -9,14 +9,27 @@ from datetime import timedelta
class ResourceType(enum.Enum):
"""资源类型"""
JOURNAL = "YSTT4HG0" # 学术期刊
DISSERTATION = "LSTPFY1C" # 学位论文
CONFERENCE = "JUP3MUPD" # 会议
NEWSPAPER = "MPMFIG1A" # 报纸
ALMANAC = "HHCPM1F8"
BOOK = "EMRPGLPA"
PATENT = "VUDIXAIY"
STANDARD = "WQ0UVIAA"
学术期刊 = JOURNAL = "JOURNAL" # 学术期刊
学位论文 = DISSERTATION = "DISSERTATION" # 学位论文
会议 = CONFERENCE = "CONFERENCE" # 会议
报纸 = NEWSPAPER = "NEWSPAPER" # 报纸
年鉴 = ALMANAC = "ALMANAC"
图书 = BOOK = "BOOK"
专利 = PATENT = "PATENT"
标准 = STANDARD = "STANDARD"
ACHIEVEMENTS = "ACHIEVEMENTS"
class SourceDatabaseEnum(enum.Enum):
"""来源库id"""
JOURNAL = 学术期刊 = "YSTT4HG0" # 学术期刊
DISSERTATION = 学位论文 = "LSTPFY1C" # 学位论文
CONFERENCE = 会议 = "JUP3MUPD" # 会议
NEWSPAPER = 报纸 = "MPMFIG1A" # 报纸
ALMANAC = 年鉴 = "HHCPM1F8"
BOOK = 图书 = "EMRPGLPA"
PATENT = 专利 = "VUDIXAIY"
STANDARD = 标准 = "WQ0UVIAA"
ACHIEVEMENTS = "BLZOG7CK"
@ -116,6 +129,17 @@ class LogicEnum(enum.Enum):
NOT = 2
class SingleResultEnum(enum.Enum):
"""
分组ID枚举类
"""
年度 = YE = "YE"
期刊 = "QK"
机构 = "AFC"
来源类别 = LYBSM = "LYBSM"
研究层次 = "YJCC"
class UpdatedTimeEnum(enum.Enum):
"""
最近一段时间的枚举

@ -41,6 +41,7 @@ class MongoPipeline(MongoDBUtils):
super().__init__(mongo_uri, mongo_db)
self.stats: StatsCollector = stats
self.insert_failure_update_enable = True
self.duplicate_cover_enable = False # 重复项覆盖
@classmethod
def from_crawler(cls, crawler: Crawler):
@ -71,7 +72,8 @@ class MongoPipeline(MongoDBUtils):
logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value)
d.pop("_id", None)
[d.pop(k, None) for k in key_pattern.keys()]
up_result = collection.update_one(filter=key_value, update={"$set": d}, upsert=True)
update_q = build_update_query(d, replace=self.duplicate_cover_enable)
up_result = collection.update_one(filter=key_value, update=update_q, upsert=True)
self.stats.inc_value("item2db_updated/{}".format(item_type))
except Exception:
raise

@ -0,0 +1,128 @@
from __future__ import annotations
import math
from copy import deepcopy
from datetime import datetime
from typing import TYPE_CHECKING, Any, Self
from pprint import pformat
import scrapy
from science_article_cnki.items import CnkiArticeSourceItem
from science_article_cnki.models.enum_cls import SingleResultEnum
from science_article_cnki.models import cnki_model as model
from science_article_cnki.utils.tools import str2int
from science_article_cnki.configs import cnki as config
class CnkiArticleTagSourceSpider(scrapy.Spider):
name = "cnki_article_tag_source"
custom_settings = dict(
DOWNLOADER_MIDDLEWARES={
"science_article_cnki.middlewares.CnkiSearchHeadersDownloaderMiddleware": 540,
},
ITEM_PIPELINES={
"science_article_cnki.pipelines.MongoPipeline": 300,
# "science_article_cnki.pipelines.verify_data.VerifyDataIntegrity": 400,
},
LOG_LEVEL="INFO"
)
query: str
resource_type: str = "学术期刊"
group: str = "来源类别"
query_condition: dict
async def start(self):
# 先去取一个核心的聚合页,看看每个核心的文章还有标志是啥
m = dict(query=self.query, resource_type=self.resource_type, page=1,
**self.query_condition)
m.update(filters=[])
query_body = model.adv_refine_search(**m)
form_d = model.single_result_nav(query_body, groupId=SingleResultEnum[self.group])
yield scrapy.FormRequest(url=config.SIGNAL_RESULT_API, method="POST",
formdata=form_d, meta=dict(REQUEST_Q=m))
def parse(self, response, **kwargs):
request_q = response.meta["REQUEST_Q"]
project_field = response.xpath('//dd/@field').get() # 单一类别的标识
project_title = response.xpath('//dd/@tit').get() # 单一类别的名字
nodes = response.xpath('//div[@class="resultlist"]/ul/li')
priority = 0
for node in nodes:
# 解析分组后的条目
s_code = node.xpath('./input/@value').get() # 用来筛选的input代码
s_text = node.xpath('./input/@text').get() # 筛选项的值
s_title = node.xpath('./input/@title').get() # 显示的值(目前看与筛选项的值一致)
total_prm = node.xpath('./span/text()').re_first(r'\((.*?)\)')
s_total = str2int(total_prm.replace(',', ''), 0)
# max_page = math.ceil(s_total / config.BATCH_SEARCH_RESULT_LIMIT)
# request_q['max_page'] = max_page
group = dict(
project=project_title,
value=s_code,
text_or_title=s_text,
)
self.logger.info("组: %s" % pformat(group))
q_bak: dict = deepcopy(request_q)
q_bak.update(group)
q_bak.update(source_types=s_text)
q_bak.setdefault('filters', []).append(group)
query_body = model.adv_refine_search(**q_bak)
model.add_muti_group(**q_bak,
base_query=query_body)
form_d = model.adv_query_search(query_body, **q_bak)
priority -= 100 # 保证一个类别一个优先级
yield scrapy.FormRequest(
url=config.CNKI_ADV_SEARCH_API, method="POST",
formdata=form_d, priority=priority,
callback=self.parse_result,
meta=dict(REQUEST_Q=q_bak)
)
# return
def parse_result(self, response, **kwargs):
priority = response.request.priority
request_q = response.meta["REQUEST_Q"]
msg = """当前检索: %(query)s,\n筛选项: %(filters)s,\n页数: %(page)s"""
kws = {
"query": request_q.get("query"),
"filters": pformat(request_q.get("filters", [])),
"page": '{c}/{m}'.format(c=request_q.get("page", 1), m=request_q.get("max_page", 'null'))
}
self.logger.info(msg % kws)
# 提取检索结果的数量
total_prm = response.xpath('//span[@class="pagerTitleCell"]/em/text()').get()
if not total_prm:
self.logger.warning("响应body: \n{resp}".format(resp=response.body))
return
if request_q.get("page", 1) == 1:
total = str2int(total_prm.replace(',', '')) # 格式化数量字符串并转int
# 计算一共有多少页
max_page = math.ceil(total / config.BATCH_SEARCH_RESULT_LIMIT)
request_q['max_page'] = max_page
tr_nodes = response.xpath('//div[@id="gridTable"]//table[@class="result-table-list"]/tbody/tr')
for tr_node in tr_nodes:
third_id = tr_node.xpath('./td[@class="operat"]/a[@class="icon-collect"]/@data-filename').get() # 三方id
if third_id:
st_item = CnkiArticeSourceItem()
st_item['third_id'] = third_id
st_item['source_types'] = [request_q.get("source_types")]
yield st_item
q_bak: dict = deepcopy(request_q)
q_bak['page'] += 1
if q_bak['page'] > q_bak['max_page']:
self.logger.info("当前采集结束")
return
query_body = model.adv_refine_search(**q_bak)
model.add_muti_group(**q_bak,
base_query=query_body)
search_param = model.adv_query_search(query_body, **q_bak)
yield scrapy.FormRequest(
url=config.CNKI_ADV_SEARCH_API, method="POST",
formdata=search_param, priority=priority,
callback=self.parse_result,
meta=dict(REQUEST_Q=q_bak)
)

@ -17,10 +17,6 @@ if TYPE_CHECKING:
class CnkiCitedNumberSpider(scrapy.Spider):
name = "cnki_cited_number"
custom_settings = dict(
DEFAULT_REQUEST_HEADERS={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en",
},
DOWNLOADER_MIDDLEWARES={
"science_article_cnki.middlewares.CnkiSearchHeadersDownloaderMiddleware": 540,
},
@ -50,7 +46,8 @@ class CnkiCitedNumberSpider(scrapy.Spider):
query_body = model.adv_refine_search(**m)
search_param = model.adv_query_search(query_body, **m)
yield scrapy.FormRequest(
url=config.CNKI_ADV_SEARCH_API, method="POST", formdata=search_param, meta=m
url=config.CNKI_ADV_SEARCH_API, method="POST",
formdata=search_param, meta=m
)
def parse(self, response, **kwargs):
@ -84,7 +81,8 @@ class CnkiCitedNumberSpider(scrapy.Spider):
query_body = model.adv_refine_search(**meta_copy)
search_param = model.adv_query_search(query_body, **meta_copy)
yield scrapy.FormRequest(
url=config.CNKI_ADV_SEARCH_API, method="POST", formdata=search_param,
url=config.CNKI_ADV_SEARCH_API, method="POST",
formdata=search_param,
meta=meta_copy
)

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/5 09:18
# @Author : zhaoxiangpeng
# @File : crawl_cited_number.py
from twisted.internet import defer
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from science_article_cnki.spiders.cnki_cited_number import CnkiCitedNumberSpider
"""
def test_starter():
y = 2025
init_params = {
'query': '(作者单位:河北工程技术学院(模糊)',
'query_condition': {'year': str(y)}
}
process = CrawlerProcess(get_project_settings())
process.crawl(CnkiCitedNumberSpider, **init_params)
process.start()
"""
def starter_by_year():
@defer.inlineCallbacks
def f(range_list: list = None):
for y in range_list:
init_params = {
'query': '(作者单位:大连东软信息学院(模糊)',
'query_condition': {'year': str(y)}
}
yield process.crawl(CnkiCitedNumberSpider, **init_params)
process = CrawlerProcess(get_project_settings())
f(list(range(2021, 2026)))
process.start()
def starter():
process = CrawlerProcess(get_project_settings())
process.crawl(CnkiCitedNumberSpider)
process.start()
if __name__ == '__main__':
starter_by_year()
Loading…
Cancel
Save