Compare commits

..

14 Commits

Author SHA1 Message Date
zhaoxiangpeng 752521c87c test:cnki测试 2 months ago
zhaoxiangpeng a0a8d05c61 change: 2 months ago
zhaoxiangpeng 7da4ac90c6 wos:根据搜索记录下载 2 months ago
zhaoxiangpeng 53eb41e8d3 wos:根据id下载 2 months ago
zhaoxiangpeng 68306a03ab cnki:add spider 2 months ago
zhaoxiangpeng 2bd56aeb10 wos:add spider 2 months ago
zhaoxiangpeng 3e50a7acef wos:add model 2 months ago
zhaoxiangpeng 8773bc7b03 cnki:add model 2 months ago
zhaoxiangpeng 3b3a647fc7 wos:一键运行 2 months ago
zhaoxiangpeng 6e02472bfd cssci:add 2 months ago
zhaoxiangpeng 61129eadf2 cssci:根据id进行采集 2 months ago
zhaoxiangpeng 7f16b4da3c cssci:add settings 2 months ago
zhaoxiangpeng 43b26550e7 wos: 2 months ago
zhaoxiangpeng c94aba0245 wos:添加接口文档说明 2 months ago

@ -0,0 +1,119 @@
# pipelines/buffer_component.py
from typing import Dict, List, Any, Optional
from itemadapter import ItemAdapter
import time
class SimpleBuffer:
"""
简单的缓冲区组件只负责缓存管理不处理数据库插入
"""
def __init__(self, buffer_max_size: int = 100, flush_interval: int = 30):
self.buffer_max_size = buffer_max_size
self.flush_interval = flush_interval
# 缓冲区
self.buffers: Dict[str, List[Dict]] = {}
self.total_size = 0
# 时间控制
self.last_flush_time = time.time()
# 统计
self.stats = {
'items_added': 0,
'buffers_flushed': 0,
'last_operation': None
}
def add_item(self, item: Any, item_type: str) -> bool:
"""
添加Item到缓冲区
Args:
item: 要添加的Item
item_type: Item类型标识
Returns:
bool: 是否触发了刷新
"""
# 初始化该类型的缓冲区
if item_type not in self.buffers:
self.buffers[item_type] = []
# 转换Item为字典
item_dict = self._item_to_dict(item)
# 添加到缓冲区
self.buffers[item_type].append(item_dict)
self.total_size += 1
self.stats['items_added'] += 1
# 检查是否需要刷新
should_flush = (
len(self.buffers[item_type]) >= self.buffer_max_size or
self._should_flush_by_time()
)
if should_flush:
self.last_flush_time = time.time()
return should_flush
def get_buffer(self, item_type: str) -> List[Dict]:
"""获取指定类型的缓冲区数据"""
return self.buffers.get(item_type, [])
def get_all_buffers(self) -> Dict[str, List[Dict]]:
"""获取所有缓冲区数据"""
return self.buffers.copy()
def clear_buffer(self, item_type: str):
"""清空指定类型的缓冲区"""
if item_type in self.buffers:
self.total_size -= len(self.buffers[item_type])
self.buffers[item_type].clear()
self.stats['buffers_flushed'] += 1
def clear_all_buffers(self):
"""清空所有缓冲区"""
for item_type in list(self.buffers.keys()):
self.clear_buffer(item_type)
def get_buffer_size(self, item_type: str) -> int:
"""获取指定类型缓冲区的大小"""
return len(self.buffers.get(item_type, []))
def get_total_size(self) -> int:
"""获取总缓冲区大小"""
return self.total_size
def should_flush(self, item_type: str) -> bool:
"""检查是否需要刷新"""
return (
self.get_buffer_size(item_type) >= self.buffer_max_size or
self._should_flush_by_time()
)
def _should_flush_by_time(self) -> bool:
"""基于时间检查是否需要刷新"""
return time.time() - self.last_flush_time >= self.flush_interval
def _item_to_dict(self, item: Any) -> Dict[str, Any]:
"""Item转字典"""
if hasattr(item, 'items'): # 已经是字典或类似字典的对象
return dict(item)
else:
adapter = ItemAdapter(item)
return dict(adapter)
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
buffer_sizes = {k: len(v) for k, v in self.buffers.items()}
return {
**self.stats,
'buffer_sizes': buffer_sizes,
'total_buffered': self.total_size,
'buffer_types': list(self.buffers.keys())
}

@ -8,7 +8,7 @@ from typing import Union, List, Dict
from datetime import datetime, timedelta from datetime import datetime, timedelta
from science_article_cnki.models.enum_cls import ( from science_article_cnki.models.enum_cls import (
ResourceType, SourceDatabaseEnum, ResourceType, SourceDatabaseEnum, ResourceLanguageEnum, KuaKuCodeEnum,
SearchTypeId, SearchFieldEnum, OperatorEnum, LogicEnum, SearchTypeId, SearchFieldEnum, OperatorEnum, LogicEnum,
SingleResultEnum SingleResultEnum
) )
@ -436,6 +436,17 @@ def add_search_word(search_content: str, base_query: dict = None):
return words_query return words_query
def add_muti_filters(base_query: dict, filters: List[dict] = None):
"""
对一个检索式批量添加多个筛选项
"""
if not filters:
return base_query
for filter in filters:
add_muti_group(**filter, base_query=base_query)
return base_query
def add_muti_group( def add_muti_group(
project: Union[SingleResultEnum, str], project: Union[SingleResultEnum, str],
value: Union[str, List[str]], value: Union[str, List[str]],
@ -586,6 +597,7 @@ def temp_refine_search(
year: int = None, year: int = None,
updated_date: str = None, updated_date: str = None,
resource_type: str = '学术期刊', resource_type: str = '学术期刊',
lang: Union[str, ResourceLanguageEnum] = ResourceLanguageEnum.中文,
**kwargs **kwargs
): ):
""" """
@ -594,9 +606,12 @@ def temp_refine_search(
:param updated_date: 更新时间不限最近一周/一月/半年/一年今年迄今上一年度 :param updated_date: 更新时间不限最近一周/一月/半年/一年今年迄今上一年度
:param year: 指定筛选的年份如果需要与updated_date参数同时使用需要在限制更新时间后再筛选 :param year: 指定筛选的年份如果需要与updated_date参数同时使用需要在限制更新时间后再筛选
:param resource_type: :param resource_type:
:param lang:
:param kwargs: :param kwargs:
:return: :return:
""" """
if isinstance(lang, str):
lang = ResourceLanguageEnum[lang]
_query = { _query = {
"Platform": "", "Platform": "",
"Resource": ResourceType[resource_type].value, "Resource": ResourceType[resource_type].value,
@ -614,8 +629,8 @@ def temp_refine_search(
}, },
"ExScope": "1", "ExScope": "1",
"SearchType": 1, "SearchType": 1,
"Rlang": "CHINESE", "Rlang": lang.value,
"KuaKuCode": "", "KuaKuCode": KuaKuCodeEnum[resource_type].value,
"Expands": {}, "Expands": {},
"View": "changeDBOnlyFT", "View": "changeDBOnlyFT",
"SearchFrom": 1 "SearchFrom": 1
@ -644,6 +659,16 @@ def temp_query_search(query_body, query: str = None, page: int = 1, page_size: i
:param updated_date: 默认不限 :param updated_date: 默认不限
:return: :return:
""" """
def getFromString():
rr = kwargs.get('resource_type', "总库")
d = ""
d += f"资源范围:{rr}; "
time_range = '更新时间:%(updated_date)s' % {"updated_date": updated_date}
d += f'时间范围:{time_range}; '
if rr == '学术期刊':
d += '来源类别:全部期刊'
d += '; '
return d
page = str(page) page = str(page)
page_size = str(page_size) page_size = str(page_size)
if page == '1': if page == '1':
@ -658,8 +683,7 @@ def temp_query_search(query_body, query: str = None, page: int = 1, page_size: i
"dstyle": "listmode", "dstyle": "listmode",
"boolSortSearch": "false", "boolSortSearch": "false",
"aside": aside, "aside": aside,
"searchFrom": "资源范围:学术期刊; 仅看有全文,中英文扩展; 时间范围:更新时间:%(updated_date)s; 来源类别:全部期刊; " % { "searchFrom": getFromString(),
"updated_date": updated_date},
"subject": "", "subject": "",
"language": "", "language": "",
"uniplatform": "", "uniplatform": "",

@ -39,6 +39,11 @@ class ProductsEnum(enum.Enum):
pass pass
class KuaKuCodeEnum(enum.Enum):
总库 = 'YSTT4HG0,LSTPFY1C,JUP3MUPD,MPMFIG1A,EMRPGLPA,WQ0UVIAA,BLZOG7CK,PWFIRAGL,NN3FJMUV,NLBO1Z6R'
学术期刊 = ''
class ResourceLanguageEnum(enum.Enum): class ResourceLanguageEnum(enum.Enum):
中文 = "CHINESE" 中文 = "CHINESE"
外文 = "FOREIGN" 外文 = "FOREIGN"

@ -0,0 +1,40 @@
from typing import AsyncIterator, Any
import scrapy
from science_article_cnki.models import cnki_model as model
from science_article_cnki.configs import cnki as config
class CnkiLatestIncrementSpider(scrapy.Spider):
name = "cnki_latest_increment"
custom_settings = dict(
DOWNLOADER_MIDDLEWARES={
"science_article_cnki.middlewares.CnkiSearchHeadersDownloaderMiddleware": 540,
},
ITEM_PIPELINES={
"science_article_cnki.pipelines.MongoPipeline": 300,
"science_article_cnki.pipelines.DupTodoPipeline": 310,
# "science_article_cnki.pipelines.verify_data.VerifyDataIntegrity": 400,
},
# LOG_LEVEL="INFO"
)
source = 'cnki'
resource_type: str = "学术期刊"
query_id: int
query: str
filters: list = list()
async def start(self) -> AsyncIterator[Any]:
m = dict(query=self.query, resource_type=self.resource_type, page=1)
m.update(filters=self.filters)
query_body = model.adv_refine_search(**m)
# 把筛选项加到查询体中
model.add_muti_filters(base_query=query_body, filters=m.get("filters"))
form_d = model.adv_query_search(query_body, **m)
yield scrapy.FormRequest(url=config.CNKI_ADV_SEARCH_API, method="POST",
formdata=form_d, meta=dict(REQUEST_Q=m))
def parse(self, response):
pass

@ -94,7 +94,7 @@ def add_year2item(item, year: Union[int, None], pub_datetime):
if dt: if dt:
year = dt.year year = dt.year
if year: if year:
item.year = year item['year'] = year
return item return item

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/12 14:13
# @Author : zhaoxiangpeng
# @File : crawl_crossdb_article.py
from twisted.internet import defer
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from science_article_cnki.spiders.cnki_article_conference import CnkiArticleConferenceSpider
def starter_by_year():
@defer.inlineCallbacks
def f(range_list: list = None):
for y in range_list:
init_params = {
'query': '(作者单位:河北工程技术学院(模糊)',
'filters': [
dict(project="年度", value=f"{y}", text_or_title=f"{y}"),
]
}
yield process.crawl(CnkiArticleConferenceSpider, **init_params)
process = CrawlerProcess(get_project_settings())
f(list(range(2021, 2022)))
process.start()
def starter():
process = CrawlerProcess(get_project_settings())
process.crawl(CnkiArticleConferenceSpider)
process.start()
if __name__ == '__main__':
starter_by_year()

@ -31,8 +31,8 @@ def starter_more_year():
@defer.inlineCallbacks @defer.inlineCallbacks
def f(years: list = None): def f(years: list = None):
init_params = { init_params = {
'query_id': 1611, 'query_id': 1609,
'query': '(作者单位:武昌首义学院(模糊)', 'query': '(作者单位:河北工程技术学院(模糊)',
'filters': [ 'filters': [
dict(project="年度", value=[f"{y}" for y in years], text_or_title=[f"{y}" for y in years]), dict(project="年度", value=[f"{y}" for y in years], text_or_title=[f"{y}" for y in years]),
] ]

@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
# @Time : 2026/2/28 09:36
# @Author : zhaoxiangpeng
# @File : crawl_article_latest.py
import time
from typing import List
import pymysql
from pymysql import cursors
from twisted.internet import defer
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from science_article_cnki.spiders.cnki_latest_increment import CnkiLatestIncrementSpider
def get_connect() -> pymysql.Connection:
conn: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306,
database='science_data_dept', user='science-data-dept',
passwd='datadept1509', )
return conn
def starter():
process = CrawlerProcess(get_project_settings())
process.crawl(CnkiLatestIncrementSpider)
process.start()
def starter_latest_by_record(record_id: int):
@defer.inlineCallbacks
def f():
client: pymysql.Connection = get_connect()
cursor = client.cursor(cursors.DictCursor)
cursor.execute(
'select b.id as task_id, q.id as query_id, q.content as content, b.task_condition as task_condition, q.source_type as source_type, b.is_done as is_done from task_batch_record as b join task_search_strategy as q on b.query_id=q.id where b.id=%s and q.source_type=5 limit 1',
(record_id,))
result = cursor.fetchone()
query_id = result['query_id']
cursor.execute('select org_id, org_name from relation_org_query where query_id=%s', (query_id,))
org_results: List[dict] = cursor.fetchall()
result['org_id'] = [org_result['org_id'] for org_result in org_results]
result['org_name'] = [org_result['org_name'] for org_result in org_results]
init_params = result
init_params = {
'query_id': 1609,
'query': '(作者单位:河北工程技术学院(模糊)',
'filters': [
dict(project="年度", value=[f"{y}" for y in years], text_or_title=[f"{y}" for y in years]),
]
}
yield process.crawl(CnkiLatestIncrementSpider, task_obj=init_params)
process = CrawlerProcess(get_project_settings())
f()
process.start()
process.stop()
if __name__ == '__main__':
starter_latest_by_record(8057)

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/13 14:54
# @Author : zhaoxiangpeng
# @File : test_item_exists.py
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.collection import Collection
from science_article_cnki.db_utils.mongo import MongoDBUtils
from science_article_cnki.settings import MONGO_URI, MONGO_DATABASE
client: MongoClient = MongoClient(MONGO_URI)
db: Database = client[MONGO_DATABASE]
def test_item_exists():
collection: Collection = db.get_collection('data_cnki_article')
results = collection.find_one(filter={"third_id": {"$in": ['SCJI202502004']}}, projection={"_id": 0, "third_id": 1})
print(results)

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/13 16:08
# @Author : zhaoxiangpeng
# @File : test_more_so.py
from parsel import Selector
TABLE_HEAD_EN = ['src_db', 'title', 'author', 'org', 'journal', 'keyword', 'abstract', 'pub_time', 'first_duty', 'fund', 'year', 'volum', 'issue', 'page', 'classification_code', 'issn', 'url', 'doi']
def test_parser():
with open('Y:\cnki-metadata\CNKI-20260112161602991.xls', encoding='utf-8') as f:
data = f.read()
print(data)
selector = Selector(data)
rows = selector.xpath(r'//tr')
for row in rows[1:]:
cols = row.xpath('./td')
row_datas = []
for col in cols:
col_data = col.xpath('string(.)').get().strip()
row_datas.append(col_data)
data = dict(zip(TABLE_HEAD_EN, row_datas))
if data.get('src_db') == 'SrcDatabase-来源库':
continue
print(data)

@ -28,6 +28,7 @@ if TYPE_CHECKING:
mongo_logger = logging.getLogger('pymongo') mongo_logger = logging.getLogger('pymongo')
mongo_logger.setLevel(logging.WARNING) mongo_logger.setLevel(logging.WARNING)
logging.getLogger('kafka').setLevel(logging.WARNING)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -253,7 +254,7 @@ class KafkaPipeline:
future = self.producer.send( future = self.producer.send(
topic=self.topic, topic=self.topic,
value=d, value=d,
headers=[{'source_type': b'cssci'}] headers=[('source_type', b'cssci')]
) )
future.add_callback(self.on_send_success) future.add_callback(self.on_send_success)
future.add_callback(self.on_send_success) future.add_callback(self.on_send_success)
@ -270,6 +271,9 @@ class KafkaPipeline:
def build2kafka(self, item: dict) -> dict: def build2kafka(self, item: dict) -> dict:
dd = dict( dd = dict(
id=item.get("third_id"), id=item.get("third_id"),
**item.get('detailed') school_id="999",
**item.get('detailed'),
updated_time="2025-11-01 09:01:56"
) )
dd.pop("references", None)
return dd return dd

@ -0,0 +1,159 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/21 16:45
# @Author : zhaoxiangpeng
# @File : firld_parser.py
import json
from datetime import datetime
from typing import Dict, Callable, Any, List
import pandas as pd
class ScopusFieldParsing:
@staticmethod
def parse_basic_information(frame: Dict[str, dict]) -> Dict[str, Any]:
return dict(
id=frame.get("sno"),
title=frame.get("lypm"),
title_format=handle_format_str(frame.get("lypm")),
abstract=None,
url='http://cssci.nju.edu.cn/control/controllers.php?control=search&action=source_id&id=' + frame.get("sno", ''),
article_type_string=frame.get("subtypeDescription"),
doi=frame.get("prism:doi"),
)
@staticmethod
def parse_date_information(frame: Dict[str, dict]) -> Dict[str, Any]:
frame = frame.get("frame")
date = frame.get("prism:coverDate")
source = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("source")
publicationdate = source.get("publicationdate")
def f():
results = [dict(
pub_year=publicationdate.get("year"),
v_month=publicationdate.get("month"),
v_day=publicationdate.get("day")
)]
return json_dumps(results, ensure_ascii=False)
return dict(
vyear=publicationdate.get("year"),
pub_date=f(),
ea_year=None,
ea_month=None,
)
@staticmethod
def parse_article_source_information(frame: Dict[str, dict]) -> Dict[str, Any]:
frame = frame.get("frame")
return dict(
volume=frame.get("prism:volume"),
issue=frame.get("prism:issueIdentifier"),
lang=frame.get("language").get("@xml:lang"),
pages=None,
startpage=frame.get("prism:startingPage"),
endpage=frame.get("prism:endingPage"),
)
@staticmethod
def parse_source_information(frame: Dict[str, dict]) -> Dict[str, Any]:
frame = frame.get("frame")
source = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("source")
issn_list = source.get("issn")
if isinstance(issn_list, dict):
issn_list = [issn_list]
issn = None
eissn = None
for issn_obj in issn_list:
if issn_obj.get('@type') == "print":
issn = issn_obj.get("$")
elif issn_obj.get('@type') == "electronic":
eissn = issn_obj.get("$")
else:
issn = issn_obj.get("$")
return dict(
journal=frame.get("prism:publicationName"),
journal_format=handle_format_str(frame.get("prism:publicationName"), str_type="en"),
issn=FormatUtil.formatISSN(issn),
eissn=FormatUtil.formatISSN(eissn),
cn=None,
isbn=None,
)
@staticmethod
def parse_meeting_information(frame: pd.DataFrame = None) -> Dict[str, Any]:
return dict(
meeting_name=None,
meeting_time=None,
meeting_address=None,
)
@staticmethod
def parse_publish_information(frame: pd.DataFrame = None) -> Dict[str, Any]:
source = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("source")
return dict(
publisher=None,
pub_city=None,
pub_country=source.get("@country"),
)
@staticmethod
def parse_author_information(frame: pd.DataFrame = None) -> Dict[str, Any]:
author_group: List[dict] = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("author-group", [])
orcid_list = []
for group in author_group:
affiliation: dict = group.get("affiliation", {})
author_list: List[dict] = group.get("author", [])
for author_obj in author_list:
surname = author_obj.get("ce:surname")
given_name = author_obj.get("ce:given-name")
auid = author_obj.get("@auid")
orcid = author_obj.get("@orcid")
if orcid:
orcid_list.append(orcid)
result_dict = process_author_address_relation_row(frame)
return dict(
email=None,
researcher_id=None,
orc_id='; '.join(orcid_list) if orcid_list else None,
author_order=result_dict['author_order'],
address_order=result_dict['address_order'],
relation_author_address=result_dict['relation_author_address'],
)
@staticmethod
def parse_other_information(frame: Dict[str, dict]) -> Dict[str, Any]:
authkeywords = frame.get("authkeywords", {})
auth_keywords = authkeywords.get("author-keyword", [])
keywords = json_dumps([auth_keyword.get("$") for auth_keyword in auth_keywords], ensure_ascii=False)
subject_areas = frame.get("subject-areas", {}).get("subject-area", [])
sub_areas = json_dumps([subject_area.get("$") for subject_area in subject_areas], ensure_ascii=False)
return dict(
key_words=keywords,
sub_code=sub_areas,
source_type="2",
wos_we_tag=None,
)
def _parsing(self, row) -> Dict[str, Any]:
scopus_json = row.get('scopus_json')
df_dict = json.loads(scopus_json)
df = df_dict.get("abstracts-retrieval-response")
new_dict = dict()
new_dict.update(self.parse_basic_information(df))
new_dict.update(self.parse_date_information(df))
new_dict.update(self.parse_article_source_information(df))
new_dict.update(self.parse_source_information(df))
new_dict.update(self.parse_meeting_information(df))
new_dict.update(self.parse_publish_information(df))
new_dict.update(self.parse_author_information(df))
new_dict.update(self.parse_other_information(df))
new_dict.update(dict(updated_time=row.get('updated_time')))
return new_dict
def parsing(self, df: pd.DataFrame = None):
result = df[['scopus_json', 'updated_time']].apply(self._parsing, axis=1)
pdf_result = list(result.values)
return pdf_result

@ -107,4 +107,4 @@ COOKIE_POOL_REDIS_KEY = 'cookies_pool:cssci:session'
COOKIE_REDIS_TTL = 60 * 60 * 6 COOKIE_REDIS_TTL = 60 * 60 * 6
KAFKA_SERVERS = ['hadoop01:9092', 'hadoop02:9092', 'hadoop03:9092'] KAFKA_SERVERS = ['hadoop01:9092', 'hadoop02:9092', 'hadoop03:9092']
KAFKA_TOPIC = "test2kafka" # KAFKA_TOPIC = "testWosTopic" #

@ -0,0 +1,11 @@
# Automatically created by: scrapy startproject
#
# For more information about the [deploy] section see:
# https://scrapyd.readthedocs.io/en/latest/deploy.html
[settings]
default = science_article_cssci.settings
[deploy]
#url = http://localhost:6800/
project = science_article_cssci

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/20 17:06
# @Author : zhaoxiangpeng
# @File : crawl_article_by_id.py
import time
import logging
import json
from typing import List
import redis
from twisted.internet import defer
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from science_article_cssci.spiders.cssci_article_by_id import CssciArticleByIdSpider
def push_task():
settings = get_project_settings()
r = redis.StrictRedis.from_url(settings.get("REDIS_URL"))
r.lpush(
"cssci_article_by_id:start_urls",
*[
json.dumps({'third_id': '11G0412025010007'}, ensure_ascii=False),
json.dumps({'third_id': '11C1172023010002'}, ensure_ascii=False),
json.dumps({'third_id': '11J0092023020008'}, ensure_ascii=False),
json.dumps({'third_id': '44Z0712023010003'}, ensure_ascii=False),
json.dumps({'third_id': '11D1022023010001'}, ensure_ascii=False),
json.dumps({'third_id': '22D1042023010007'}, ensure_ascii=False),
])
def starter():
process = CrawlerProcess(get_project_settings())
process.crawl(CssciArticleByIdSpider)
process.start()
if __name__ == '__main__':
push_task()
starter()

File diff suppressed because one or more lines are too long

@ -0,0 +1,32 @@
class Settings:
env = "dev"
SEARCH_ROUTE = '/api/wosnx/core/runQuerySearch'
EXPORT_ROUTE = '/api/wosnx/indic/export/saveToFile'
DB_CHANGE_ELE = '//*[@id="global-select"]/div/div[@aria-label="Select database"]/div[@title="Web of Science Core Collection"]'
QUERY_INPUT_ELE = '//*[@id="advancedSearchInputArea"]'
SEARCH_BUTTON_ELE = '//button[@data-ta="run-search"]/span[@class="mat-mdc-button-touch-target"]'
EXPORT_BUTTON_ELE = '//*[@id="export-trigger-btn"]'
TABWIN_BUTTON_ELE = '//*[@id="exportToTabWinButton"]' # 制表符分割文件button
RECORD_TYPE_SELECT_ELE = '//div[@class="ng-star-inserted"]/wos-select/button[@aria-haspopup="listbox"]' # 记录内容选择框
FULL_RECORD_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="Full Record"]' # 完整记录
FULL_RECORD_REFERENCE_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="Full Record and Cited References"]' # 全记录与参考文献
RECORD_RANGE_ELE = '//*[@id="radio3-input"]' # 记录范围
RECORD_EXPORT_START_ELE = '//input[@name="markFrom"]'
RECORD_EXPORT_END_ELE = '//input[@name="markTo"]'
EXPORT_FILE_ELE = '//*[@id="exportButton"]'
INPUT_CONTENT = '(OG=(Anhui University of Science & Technology)) AND PY=(2025)'
class ProSettings(Settings):
DB_CHANGE = '//*[@id="global-select"]/div/div[@aria-label="Select database"]/div[@title="Web of Science 核心合集"]'
EXPORT_BUTTON_ELE = '//botton[@id="export-trigger-btn"]'
FULL_RECORD_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="完整记录"]' # 完整记录
FULL_RECORD_REFERENCE_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="全记录与引用的参考文献"]' # 全记录与参考文献
settings = Settings()

@ -109,6 +109,9 @@ class ScienceArticleWosDownloaderMiddleware:
class WosStarterApiXkeyDownloaderMiddleware: class WosStarterApiXkeyDownloaderMiddleware:
"""
https://api.clarivate.com/swagger-ui/?apikey=none&url=https%3A%2F%2Fdeveloper.clarivate.com%2Fapis%2Fwos-starter%2Fswagger
"""
async def process_request(self, request, spider): async def process_request(self, request, spider):
key_param = { key_param = {
'X-ApiKey': '53b8164e7543ccebe489988287e8b871bc2c0880' 'X-ApiKey': '53b8164e7543ccebe489988287e8b871bc2c0880'
@ -185,10 +188,13 @@ class WosCookieMiddleware:
def get_sid_from_redis(self): def get_sid_from_redis(self):
val = self.redis_cli.rpoplpush(self.cookiepool_cache_key, self.cookiepool_cache_key) val = self.redis_cli.rpoplpush(self.cookiepool_cache_key, self.cookiepool_cache_key)
if val: if val:
self.redis_cli.hincrby(f'{self.redis_key_prefix}:{val}', 'used_times', 1) self.inc_used_times(val)
return val return val
return None return None
def inc_used_times(self, val: str = None):
self.redis_cli.hincrby(f'{self.redis_key_prefix}:{val}', 'used_times', 1)
def mark_sid_status(self, sid: str, status: str = 'validate'): def mark_sid_status(self, sid: str, status: str = 'validate'):
""" """
:param sid: :param sid:
@ -221,6 +227,11 @@ class WosCookieMiddleware:
self.redis_cli.delete(f'{self.cookiepool_cache_key}') self.redis_cli.delete(f'{self.cookiepool_cache_key}')
class WosSessionMiddleware:
def process_request(self, request: Request, spider: Spider):
pass
class A: class A:
def __init__(self, redis_cli): def __init__(self, redis_cli):
self.redis_cli = redis_cli self.redis_cli = redis_cli

@ -92,7 +92,6 @@ def starter_documents_get(q, db: WosDB = WosDB.WOS.name, limit: int = config.WOS
:param detail: 默认全部数据如果值为short返回较少的字段(uid, links{record,citingArticles,references,related}, citations[{db,count}], identifiers{doi,issn}) :param detail: 默认全部数据如果值为short返回较少的字段(uid, links{record,citingArticles,references,related}, citations[{db,count}], identifiers{doi,issn})
:param kwargs: :param kwargs:
:return: :return:
"""
_query_params: List[Tuple[str, str]] = [] _query_params: List[Tuple[str, str]] = []
_query_params.append(("q", q)) _query_params.append(("q", q))
if db: pass if db: pass
@ -102,6 +101,16 @@ def starter_documents_get(q, db: WosDB = WosDB.WOS.name, limit: int = config.WOS
if detail is not None: if detail is not None:
_query_params.append(("detail", detail)) _query_params.append(("detail", detail))
return _query_params return _query_params
"""
_query_params: Dict[str, Any] = dict()
_query_params.setdefault("q", q)
if db: pass
_query_params.setdefault("db", db)
_query_params.setdefault("limit", limit)
_query_params.setdefault("page", page)
if detail is not None:
_query_params.setdefault("detail", detail)
return _query_params
def make_advanced_search_ut(query: str = None, wos_ids: List = None, limit: int = 50, col_name: str = "WOS") -> Dict[ def make_advanced_search_ut(query: str = None, wos_ids: List = None, limit: int = 50, col_name: str = "WOS") -> Dict[

@ -19,7 +19,7 @@ from pymongo.errors import (
DuplicateKeyError, DuplicateKeyError,
BulkWriteError BulkWriteError
) )
from science_article_wos.items import WosIdRelationItem, WosArticleTodoIdItem, WosCitedNumberItem from science_article_wos.items import ArticleItem, WosArticleItem, WosIdRelationItem, WosArticleTodoIdItem, WosCitedNumberItem
from science_article_wos.db_utils.mongo import MongoDBUtils, update_document, build_update_query from science_article_wos.db_utils.mongo import MongoDBUtils, update_document, build_update_query
if TYPE_CHECKING: if TYPE_CHECKING:
@ -139,6 +139,15 @@ class MongoPipeline(MongoDBUtils):
return 'items_null_table' return 'items_null_table'
class Article2MongoPipeline(MongoPipeline):
def process_item(self, item, spider):
# 确定Item类型
if isinstance(item, ArticleItem):
super().process_item_update(item, spider=spider)
return item
class CitedRelation2MongoPipeline(MongoPipeline): class CitedRelation2MongoPipeline(MongoPipeline):
def process_item(self, item, spider): def process_item(self, item, spider):
# 确定Item类型 # 确定Item类型
@ -206,3 +215,75 @@ class DupTodoBySciencePipeline(DupTodoPipeline):
self.inc_item_dropped_count("exists") self.inc_item_dropped_count("exists")
return True return True
return False return False
class VerifyDataIntegrity:
def __init__(self, mongo_uri, mongo_db):
self.successful_delete = False
self.batch_ids = set()
self.successful = []
self.logger = logging.getLogger(__name__)
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.client: MongoClient = None
self.db = None
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
c = cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
)
return c
def init_db(self):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def open_spider(self, spider):
spider_batch_ids = spider.get_batch_ids()
for batch in spider_batch_ids:
if batch.get("field", "UT") == "UT":
self.batch_ids.add(batch.get("third_id"))
self.init_db()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
if isinstance(item, ArticleItem):
unique_id = adapter.get("third_id")
self.successful.append(unique_id)
if self.successful_delete:
self.batch_ids.discard(unique_id)
return item
def close_spider(self, spider):
failure = self.batch_ids - set(self.successful)
coll = self.db.get_collection("todo_ids_wos")
if self.successful:
if self.successful_delete:
coll.delete_many(filter={"third_id": {"$in": self.successful}})
self.logger.info("Successfully deleted %d articles", len(self.successful))
else:
coll.update_many(filter={"third_id": {"$in": self.successful}}, update={"$set": {"state": 1}})
self.logger.info("Successfully updated %d articles", len(self.successful))
if failure:
self.logger.warning("未下载到: %s" % list(failure))
coll.update_many(filter={"third_id": {"$in": list(failure)}}, update={"$set": {"state": -1}})
else:
self.logger.info("Successfully verified: %s" % "下载完整无异常")
def spider_end(self):
"""
组合检索式把结果写到数据库里
"""
dict(
content="",
qeury_id="",
records_found=0,
perfact=1,
state=1,
reason=""
)

@ -16,7 +16,7 @@ import redis
import requests import requests
from DrissionPage import Chromium from DrissionPage import Chromium
from science_article_wos.utils.xpath_cfg import Settings from science_article_wos.configs.wos_dp import Settings
if TYPE_CHECKING: if TYPE_CHECKING:
from DrissionPage import ChromiumPage, ChromiumOptions from DrissionPage import ChromiumPage, ChromiumOptions
@ -97,7 +97,7 @@ class DPOperations:
if clear_input: if clear_input:
input_area_ele.clear() # 清空 input_area_ele.clear() # 清空
if content is None: if content is None:
content = "(OG=(Shanghai Jiao Tong University)) AND PY=(2025)" content = "(OG=(Shanghai Jiao Tong University)) AND PY=(2026)"
input_area_ele.input(content) # 输入检索内容 input_area_ele.input(content) # 输入检索内容
@staticmethod @staticmethod
@ -328,11 +328,13 @@ class CookieManager:
logger.warning("cookie使用次数超限/需要验证,准备进行验证。。。") logger.warning("cookie使用次数超限/需要验证,准备进行验证。。。")
# 验证逻辑,导出一次过验证 # 验证逻辑,导出一次过验证
self.intercept_verify(op_func=self.dp_ins.bypass_ops) self.intercept_verify(op_func=self.dp_ins.bypass_ops)
self.sid2redis()
elif status == "expired": elif status == "expired":
logger.warning("cookie已过期准备重新获取。。。") logger.warning("cookie已过期准备重新获取。。。")
# 刷新页面或者重新进行搜索/导出 # 刷新页面或者重新进行搜索/导出
self.intercept_verify(op_func=self.refresh_page) self.intercept_verify(op_func=self.refresh_page)
self.sid2redis()
else: else:
logger.info(f"Cookie状态正常: {status}") logger.info(f"Cookie状态正常: {status}")
@ -392,7 +394,9 @@ class CookieManager:
def main(): def main():
manager = CookieManager(redis_uri="redis://:kcidea1509@192.168.1.211:6379/10", keep_browser_alive=True) from science_article_wos.settings import REDIS_URL
# manager = CookieManager(redis_uri="redis://:kcidea1509@192.168.1.211:6379/10", keep_browser_alive=True)
manager = CookieManager(redis_uri=REDIS_URL, keep_browser_alive=True)
try: try:
manager.start_monitor() manager.start_monitor()

@ -23,7 +23,7 @@ ROBOTSTXT_OBEY = False
# Concurrency and throttling settings # Concurrency and throttling settings
#CONCURRENT_REQUESTS = 16 #CONCURRENT_REQUESTS = 16
CONCURRENT_REQUESTS_PER_DOMAIN = 1 CONCURRENT_REQUESTS_PER_DOMAIN = 1
DOWNLOAD_DELAY = 1 DOWNLOAD_DELAY = 0
# Disable cookies (enabled by default) # Disable cookies (enabled by default)
#COOKIES_ENABLED = False #COOKIES_ENABLED = False

@ -5,7 +5,7 @@ from scrapy import signals
from scrapy.http import Response from scrapy.http import Response
from scrapy.http.request.json_request import JsonRequest from scrapy.http.request.json_request import JsonRequest
from .database import DatabaseSpider # from .database import DatabaseSpider
from science_article_wos.items import WosArticleItem, WosCitedNumberItem, WosIdRelationItem from science_article_wos.items import WosArticleItem, WosCitedNumberItem, WosIdRelationItem
from science_article_wos.scripts.wos_parse_data import parse_full_records from science_article_wos.scripts.wos_parse_data import parse_full_records
from science_article_wos.utils import model from science_article_wos.utils import model
@ -19,7 +19,7 @@ def maybe_list(val: Union[int, List[int]]) -> List[int]:
return list(val) return list(val)
class DownloadBySearchRecordSpider(DatabaseSpider): class DownloadBySearchRecordSpider(scrapy.Spider):
name = "download_by_search_record" name = "download_by_search_record"
custom_settings = dict( custom_settings = dict(
DOWNLOADER_MIDDLEWARES={ DOWNLOADER_MIDDLEWARES={

@ -0,0 +1,140 @@
import os
import json
from datetime import datetime
from typing import List, Dict, Union, Any, Self
import scrapy
from scrapy.http.request.json_request import JsonRequest
from scrapy.crawler import Crawler
from science_article_wos.items import WosArticleItem, WosCitedNumberItem
from science_article_wos.scripts.wos_parse_data import parse_full_records_txt
from science_article_wos.models import wos_model as model
from science_article_wos.utils import tools
from science_article_wos.configs import wos as config
def _parse_download(body: Union[bytes, str]):
"""
解析响应的下载内容
"""
batch_time = datetime.now()
if isinstance(body, str):
body = body.encode()
item_g = parse_full_records_txt(body)
parse_count = 0
for data_dic in item_g:
t_id = data_dic.pop('ut', None)
if t_id:
parse_count += 1
article_item = WosArticleItem()
article_item['third_id'] = t_id
article_item['exported'] = data_dic
article_item['updated_at'] = batch_time
yield article_item
# 解析被引量
if cited_num := tools.str2int(data_dic.get("tc", 0), 0):
cited_item = WosCitedNumberItem()
cited_item['third_id'] = t_id
cited_item['cited'] = cited_num
cited_item['updated_at'] = batch_time
yield cited_item
class WosArticleDownloadByIdSpider(scrapy.Spider):
name = "wos_article_download_by_id"
custom_settings = dict(
FILE_STORAGE_DIR=r"Y:\wos-metadata\wos increment-202603\01",
DOWNLOADER_MIDDLEWARES={
"science_article_wos.middlewares.WosCookieMiddleware": 500
},
ITEM_PIPELINES={
"science_article_wos.pipelines.Article2MongoPipeline": 300,
"science_article_wos.pipelines.VerifyDataIntegrity": 400,
},
LOG_LEVEL="INFO"
)
def __init__(self, task_obj, file_storage_dir: str = None, **kwargs):
scrapy.Spider.__init__(self)
self.file_storage_dir = file_storage_dir
self.id_list: List[Dict[str, str]] = task_obj
self._records_found = 0
@classmethod
def from_crawler(cls, crawler: Crawler, *args: Any, **kwargs: Any) -> Self:
settings = crawler.settings
from pymongo import MongoClient
client = MongoClient(settings.get("MONGO_URI"))
db = client.get_database(settings.get("MONGO_DATABASE"))
collection = db.get_collection("todo_ids_wos")
def f():
cursor = collection.find(filter={"state": 0}, projection={"state": 0}).limit(500)
d = [c for c in cursor]
if not d:
cursor = collection.find(filter={"state": 2}, projection={"_id": 0, "state": 0}).limit(500)
d = [c for c in cursor]
else:
_ids = [x.pop("_id", None) for x in d]
collection.update_many(filter={"_id": {"$in": _ids}}, update={"$set": {"state": 2}})
return d
tasks = f()
kwargs.update({"task_obj": tasks})
kwargs['file_storage_dir'] = settings.get("FILE_STORAGE_DIR")
return super().from_crawler(crawler, *args, **kwargs)
def make_query(self) -> str:
third_ids = []
for idT in self.id_list:
third_ids.append('%s=(%s)' % (idT.get('field', 'UT'), idT.get('third_id')))
todo_query = ' OR '.join(third_ids)
return todo_query
def get_batch_ids(self) -> List[Dict[str, str]]:
return self.id_list
async def start(self):
if not os.path.exists(self.file_storage_dir):
os.makedirs(self.file_storage_dir)
qu = self.make_query()
yield JsonRequest(
config.WOS_ADVANCED_SEARCH_API, method='POST', data=model.make_advanced_search_ut(query=qu),
)
def parse(self, response, **kwargs):
meta = response.meta
request = response.request
query_id, records_found = model.get_record_info(response.body)
if (not query_id) or (records_found == 0):
self.logger.warning("""
未找到记录
错误信息 %s
请求信息 %s""" % (response.text, request))
return
else:
self.set_records_found(records_found)
mark_start = 1
yield JsonRequest(config.WOS_EXPORT_FILE_API, method='POST',
data=model.export_search_data_to_txt(query_id, mark_from=mark_start,
mark_to=records_found),
meta={'QUERY_ID': query_id, 'QUERY': meta.get('QUERY'),
'FILENAME': meta.get("FILENAME"),
'RECORDS_FOUND': records_found, 'MARK_START': mark_start,
'MARK_END': records_found},
cb_kwargs=dict(filename=meta.get("FILENAME"), query_id=query_id),
callback=self.download_parse)
def download_parse(self, response, query_id: str = None, **kwargs):
filename = query_id or response.meta.get('FILENAME')
file_export_path = os.path.join(self.file_storage_dir, '%s.txt' % filename)
with open(file_export_path, 'wb') as f:
f.write(response.body)
yield from _parse_download(response.body)
def set_records_found(self, val):
self._records_found = val
def get_records_found(self) -> int:
return self._records_found

@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/19 10:01
# @Author : zhaoxiangpeng
# @File : crawl_article_by_id.py
import time
import logging
from typing import List
import pymysql
from pymysql import cursors
from twisted.internet import defer
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from science_article_wos.spiders.wos_article_download_by_id import WosArticleDownloadByIdSpider
from science_article_wos.utils import tools
logger = logging.getLogger(__name__)
def starter_forever():
def check_task() -> bool:
from pymongo import MongoClient
cli = MongoClient(settings.get("MONGO_URI"))
db = cli[settings.get("MONGO_DATABASE")]
r = db['todo_ids_wos'].find_one(filter={"state": 0})
if r:
return True
return False
def check_session() -> bool:
from redis import Redis
cli = Redis.from_url(settings.get("REDIS_URL"), decode_responses=True)
keys = cli.keys('cookie_pool:wos_sid:*')
for key in keys:
# 获取所有的信息
status = cli.hget(key, "status")
if status == "normal":
real_sid = key.rsplit(':', maxsplit=1)[-1]
return True
return False
@defer.inlineCallbacks
def f(running: bool = True):
while running:
# 连接到mongodb查询是否有未执行的任务
if not check_task():
logger.info("没有可下载的任务,即将结束")
running = False
continue
# 查询redis中是否有可用的cookie
if not check_session():
logger.info("没有有可用的cookie等待")
time.sleep(60 * 5)
continue
yield process.crawl(WosArticleDownloadByIdSpider)
time.sleep(60 * 2)
settings = get_project_settings()
process = CrawlerProcess(settings)
f(True)
process.start()
def starter():
process = CrawlerProcess(get_project_settings())
process.crawl(WosArticleDownloadByIdSpider)
process.start()
if __name__ == '__main__':
starter_forever()

@ -112,7 +112,31 @@ def starter_latest_all():
client.close() client.close()
process = CrawlerProcess(get_project_settings()) process = CrawlerProcess(get_project_settings())
f(True) f(running=True)
process.start()
process.stop()
def starter_latest_by_record(record_id: int):
@defer.inlineCallbacks
def f():
client: pymysql.Connection = get_connect()
cursor = client.cursor(cursors.DictCursor)
cursor.execute(
'select b.id as task_id, q.id as query_id, q.content as content, b.task_condition as task_condition, q.source_type as source_type, b.is_done as is_done from task_batch_record as b join task_search_strategy as q on b.query_id=q.id where b.id=%s and q.source_type=1 limit 1',
(record_id,))
result = cursor.fetchone()
query_id = result['query_id']
cursor.execute('select org_id, org_name from relation_org_query where query_id=%s', (query_id,))
org_results: List[dict] = cursor.fetchall()
result['org_id'] = [org_result['org_id'] for org_result in org_results]
result['org_name'] = [org_result['org_name'] for org_result in org_results]
init_params = result
yield process.crawl(WosLatestIncrementSpider, task_obj=init_params)
process = CrawlerProcess(get_project_settings())
f()
process.start() process.start()
process.stop() process.stop()
@ -132,3 +156,4 @@ def starter():
if __name__ == '__main__': if __name__ == '__main__':
starter_latest_all() starter_latest_all()
# starter_latest_by_record(8278)

Loading…
Cancel
Save