wos:增量采集

main
zhaoxiangpeng 3 weeks ago
parent a95f242bd5
commit 1bf0703dba

@ -1,3 +1,11 @@
sqlalchemy~=1.3.24 sqlalchemy~=1.3.24
requests~=2.32.4
scrapy~=2.13.3 scrapy~=2.13.3
itemadapter~=0.11.0 pymongo~=4.13.0
itemadapter~=0.11.0
happybase~=1.2.0
fastapi~=0.116.1
redis~=6.2.0
parsel~=1.10.0
sympy~=1.14.0
pydantic~=2.0.3

@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
# @Time : 2024/1/16 8:41
# @Author : zhaoxiangpeng
# @File : config.py
from datetime import datetime
# 数据来源名
SOURCE_NAME = 'wos'
WOS_SEARCH_API = "https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch"
WOS_DETAIL_LINK = 'https://webofscience.clarivate.cn/wos/woscc/full-record/{wos_id}'
WOS_DETAIL_API = 'https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch'
WOS_ADVANCED_SEARCH_API = 'https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch'
WOS_EXPORT_FILE_API = 'https://webofscience.clarivate.cn/api/wosnx/indic/export/saveToFile'
WOS_RECORD_STREAM_API = "https://webofscience.clarivate.cn/api/wosnx/core/runQueryGetRecordsStream"
WOS_REFINE_API = "https://webofscience.clarivate.cn/api/wosnx/core/runQueryRefine"
# WOS starter api
WOS_STARTER_DOCUMENT_UID_API = "https://api.clarivate.com/apis/wos-starter/v1/documents/{uid}" # Unique Identifier/Accession Number
WOS_STARTER_DOCUMENT_API = "https://api.clarivate.com/apis/wos-starter/v1/documents"
WOS_STARTER_PER_PAGE_LIMIT = 50 # 每页限制的数量
# WOS lite api
WOS_LITE_QUERY_FIRST_API = 'https://wos-api.clarivate.com/api/woslite' # 第一个请求请求后会有一个query的序号
WOS_LITE_QUERY_API = 'https://wos-api.clarivate.com/api/woslite/query' # 使用序号进行翻页
# 发文表
WOS_ARTICLE_COLLECTION = 'data_{}_article'.format(SOURCE_NAME)
# 被引量集合
WOS_CITED_NUMBER_COLLECTION = "relation_cited_number_{}".format(SOURCE_NAME)
# 发文关系表
SCHOOL_RELATION_COLLECTION = 'relation_school_{}'.format(SOURCE_NAME)
# 参考文献集合
WOS_REFERENCE_COLLECTION = "relation_reference_{}".format(SOURCE_NAME)
# 待下载Id表
ARTICLE_TODO_IDS_COLLECTION = "todo_ids_{}".format(SOURCE_NAME)
# CSCD来源的发文表
WOS_CSCD_ARTICLE_COLLECTION = 'data_{}_article_{}'.format(SOURCE_NAME, 'cscd')
# cookie池配置
# COOKIE_POOL_CONFIG = dict(host=setting.REDIS_HOST, port=6379, db=setting.REDIS_DB, password=setting.REDIS_PASSWORD)
COOKIE_POOL_GROUP = 'cookies_pool:wos:sid*'
COOKIE_POOL_KEY = 'cookies_pool:wos:sid-sjtu'
COOKIE_TTL = 60 * 60 * 4
# 下载的单个文件的大小
BATCH_DOWNLOAD_LIMIT = 500
# 导出文件时的默认值
DEFAULT_EXPORT_RECORD_FILTER = "fullRecordPlus" # fullRecordPlus
# 表头验证配置
SUCCESS_TABLE_HEAD_START = b'\xef\xbb\xbfPT'
LOST_TABLE_HEAD_START = b'\xef\xbb\xbfnull'
AUTO_TABLE_HEAD_START = b'\xef\xbb\xbfPT\tAU\tBA\tBE\tGP\tAF\tBF\tCA\tTI\tSO\tSE\tBS\tLA\tDT\tCT\tCY\tCL\tSP\tHO\tDE\tID\tAB\tC1\tC3\tRP\tEM\tRI\tOI\tFU\tFP\tFX\tCR\tNR\tTC\tZ9\tU1\tU2\tPU\tPI\tPA\tSN\tEI\tBN\tJ9\tJI\tPD\tPY\tVL\tIS\tPN\tSU\tSI\tMA\tBP\tEP\tAR\tDI\tDL\tD2\tEA\tPG\tWC\tWE\tSC\tGA\tPM\tOA\tHC\tHP\tDA\tUT\r\n'
CORE_NAME_TABLE = dict(
WOSCC="Web of Science Core Collection",
BCI="BIOSIS Citation Index",
SCIELO="SciELO Citation Index",
RSCI="Russian Science Citation Index",
CSCD="Chinese Science Citation Database℠",
ARCI="Arabic Citation Index",
DIIDW="Derwent Innovations Index",
PPRN="",
PQDT="ProQuest ™ Dissertations & Theses Citation Index"
)
NAV_NAME_TABLE = dict(
SCI="Science Citation Index Expanded (SCI-Expanded)",
ESCI="Emerging Sources Citation Index (ESCI)",
SSCI="Social Sciences Citation Index (SSCI)",
ISTP="Conference Proceedings Citation Index Science (CPCI-S)",
BSCI="Book Citation Index Science (BKCI-S)",
AHCI="Arts & Humanities Citation Index (A&HCI)",
IC="Index Chemicus (IC)",
ISSHP="Conference Proceedings Citation Index Social Sciences & Humanities (CPCI-SSH)"
)
TASK_CONFIG = {
"school_id": 83,
"school_name": "北京林业大学",
"search_policy": """OG=(Beijing Forestry University)""",
"crawl_year": [2021, 2022, 2023],
"source_type": 1,
"priority": 10,
"is_important": 1,
"update_interval": 60 * 60 * 24 * 14,
"create_time": datetime.now(),
"last_time": datetime.now(),
"next_time": datetime.now(),
"state": 0
}

@ -0,0 +1,92 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Optional, Dict, Tuple
from pymongo import MongoClient
from pymongo import UpdateOne
from pymongo.errors import DuplicateKeyError, BulkWriteError
if TYPE_CHECKING:
from pymongo.database import Database
from pymongo.collection import Collection
from pymongo.results import InsertOneResult, InsertManyResult, BulkWriteResult
def build_update_query(update_data: dict, replace: bool = True) -> dict:
"""
如果replace为True则直接覆盖原有的document
"""
update_query = {}
if not update_data:
return {}
for key, val in update_data.items():
if replace:
update_query.setdefault(
"$set", {}
).update(
{key: val}
)
else:
if isinstance(val, list):
update_query.setdefault(
"$addToSet", {}
).update({
key: {"$each": val}
})
else:
update_query.setdefault(
"$set", {}
).update(
{key: val}
)
return update_query
def update_document(filter_query: dict = None, update_data: dict = None, replace: bool = True) -> Tuple[dict, dict]:
update_query = {}
if not update_data:
return {}, {}
for key, val in update_data.items():
if replace:
update_query.setdefault(
"$set", {}
).update(
{key: val}
)
else:
if isinstance(val, list):
update_query.setdefault(
"$addToSet", {}
).update({
key: {"$each": val}
})
else:
update_query.setdefault(
"$set", {}
).update(
{key: val}
)
return filter_query, update_query
class MongoDBUtils:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.client: MongoClient = None
self.db: Database = None
def insert2db(self, items, tablename, **kwargs) -> InsertOneResult:
collection: Collection = self.db.get_collection(tablename)
result: InsertOneResult = collection.insert_one(items, **kwargs)
return result
def _insert2db(self, items, tablename, ordered: bool = False, **kwargs) -> InsertManyResult:
collection: Collection = self.db.get_collection(tablename)
result: InsertManyResult = collection.insert_many(items, ordered=ordered, **kwargs)
return result
def _update2db(self, items, tablename, ordered: bool = False, **kwargs) -> BulkWriteResult:
collection: Collection = self.db.get_collection(tablename)
bulk_results: BulkWriteResult = collection.bulk_write(items, ordered=ordered, **kwargs)
return bulk_results

@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/14 16:17
# @Author : zhaoxiangpeng
# @File : extensions.py
import logging
import pymysql
from scrapy import signals
from scrapy.crawler import Crawler
logger = logging.getLogger(__name__)
class LatestSpiderProtocol:
name: str
record_id: int
org_id: int
org_name: str
query_id: int
query_content: str
def get_records_found(self) -> int: ...
class ACKExtension:
def __init__(self, crawler: Crawler):
self.crawler = crawler
self.change_state_sql = 'update task_batch_record set %(update_kws)s where %(update_cond)s'
@classmethod
def from_crawler(cls, crawler):
ext = cls(crawler=crawler)
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_error, signal=signals.spider_error)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
return ext
def spider_opened(self, spider):
kws = {
'is_done': 2,
}
sql = self.change_state_sql % {
'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]),
'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id}
}
self._execute_sql(sql)
def spider_closed(self, spider: LatestSpiderProtocol):
"""
# 修改任务状态
# 通知
"""
kws = {
'is_done': 1,
'result_count': spider.get_records_found(),
'updated_time': 'CURRENT_TIMESTAMP'
}
sql = self.change_state_sql % {
'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]),
'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id}
}
self._execute_sql(sql)
def spider_error(self, spider: LatestSpiderProtocol):
kws = {
'is_done': -1,
'updated_time': 'CURRENT_TIMESTAMP'
}
sql = self.change_state_sql % {
'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]),
'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id}
}
self._execute_sql(sql)
def _execute_sql(self, sql):
settings = self.crawler.settings
client = pymysql.connect(
host=settings.get('MYSQL_HOST'),
port=settings.get('MYSQL_PORT', 3306),
database=settings.get('MYSQL_DATABASE'),
user=settings.get('MYSQL_USER'),
passwd=settings.get('MYSQL_PASSWORD'),
)
try:
cursor = client.cursor()
cursor.execute(sql)
cursor.connection.commit()
logger.info(f'Execute SQL: {sql}')
except Exception as e:
logger.exception(e)
finally:
client.close()

@ -0,0 +1,58 @@
# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html
import scrapy
class ScienceArticleWosItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
pass
class AddItemBase(scrapy.Item):
third_id = scrapy.Field()
updated_at = scrapy.Field()
class ArticleItem(AddItemBase):
exported = scrapy.Field()
class ArticleCitedItem(AddItemBase):
cited = scrapy.Field()
class WosArticleItem(ArticleItem):
"""wos文章item"""
__tablename__ = 'data_wos_article'
third_id = scrapy.Field()
exported = scrapy.Field()
updated_at = scrapy.Field()
class WosIdRelationItem(AddItemBase):
__tablename__ = 'relation_school_wos'
query_ids = scrapy.Field()
school_ids = scrapy.Field()
task_ids = scrapy.Field()
class WosArticleTodoIdItem(scrapy.Item):
__tablename__ = 'todo_ids_wos'
third_id = scrapy.Field()
state = scrapy.Field()
class WosCitedNumberItem(ArticleCitedItem):
__tablename__ = 'relation_cited_number_wos'
"""发文被引量item"""
third_id = scrapy.Field()
cited = scrapy.Field()
updated_at = scrapy.Field()

@ -108,6 +108,15 @@ class ScienceArticleWosDownloaderMiddleware:
spider.logger.info("Spider opened: %s" % spider.name) spider.logger.info("Spider opened: %s" % spider.name)
class WosStarterApiXkeyDownloaderMiddleware:
async def process_request(self, request, spider):
key_param = {
'X-ApiKey': '53b8164e7543ccebe489988287e8b871bc2c0880'
}
request.headers.update(key_param)
# return request
class WosCookieMiddleware: class WosCookieMiddleware:
def __init__(self, redis_uri: str): def __init__(self, redis_uri: str):
self.redis_cli = redis.from_url(redis_uri, decode_responses=True) self.redis_cli = redis.from_url(redis_uri, decode_responses=True)

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/14 14:20
# @Author : zhaoxiangpeng
# @File : __init__.py.py

@ -0,0 +1,331 @@
# -*- coding: utf-8 -*-
# @Time : 2023/7/13 9:40
# @Author : zhaoxiangpeng
# @File : model.py
import json
import enum
import warnings
from typing import List, Tuple, Any, Dict, Union
from urllib.parse import urlencode
from science_article_wos.configs import wos as config
false = False
true = True
null = None
class WosDB(enum.Enum):
WOS = 1
CSCD = 2
class AnalyzesEnum(enum.Enum):
WOSCC = ["TP.Value.6", "REVIEW.Value.6", "EARLY ACCESS.Value.6", "OA.Value.6", "DR.Value.6", "ECR.Value.6",
"PY.Field_D.6", "DT.Value.6", "AU.Value.6", "DX2NG.Value.6", "PEERREVIEW.Value.6"]
CSCD = ["TP.Value.6", "DR.Value.6", "OA.Value.6", "PY.Field_D.6", "DT.Value.6", "SJ.Value.6", "AU.Value.6",
"OG.Value.6", "SO.Value.6"]
ColNameMap = dict(WOS='WOSCC', CSCD='CSCD')
def calculate_next_page(next_page: int = 1, page_size: int = 100):
"""
计算下一页的游标即记录的序号
:param next_page: 下一页的页码
:param page_size: 每页的大小
:return:
"""
return (next_page - 1) * page_size + 1
def lite_base_model(usr_query: str, db_id: int = None, first_record: int = 1, page_size: int = 100, **kwargs):
if db_id is None:
db_id = 1
if first_record > 1e5:
warnings.warn('first_record 必须在 1 ~ 100000 之间')
model = {
'databaseId': WosDB(db_id).name,
'firstRecord': first_record,
'count': page_size,
'usrQuery': usr_query
}
# return urlencode(model)
return model
def lite_query_model(db_id: int = None, first_record: int = 1, page_size: int = 100, **kwargs):
if db_id is None:
db_id = 1
model = {
'databaseId': WosDB(db_id).name,
'firstRecord': first_record,
'count': page_size,
}
return urlencode(model)
def starter_documents_uid_get(uid, detail: str = None):
"""
:param uid:
:param detail:
:return:
"""
_query_params: List[Tuple[str, str]] = []
if detail is not None:
_query_params.append(("detail", detail))
def starter_documents_get(q, db: WosDB = WosDB.WOS.name, limit: int = config.WOS_STARTER_PER_PAGE_LIMIT, page: int = 1, sort_field: str = None,
modified_time_span=None, tc_modified_time_span=None, detail=None, **kwargs):
"""
:param q:
:param db:
:param limit: 最大为50
:param page: 当limit为50时范围为1~2000也就是最多10w条
:param sort_field:
:param modified_time_span:
:param tc_modified_time_span:
:param detail: 默认全部数据如果值为short返回较少的字段(uid, links{record,citingArticles,references,related}, citations[{db,count}], identifiers{doi,issn})
:param kwargs:
:return:
"""
_query_params: List[Tuple[str, str]] = []
_query_params.append(("q", q))
if db: pass
_query_params.append(("db", db))
_query_params.append(("limit", limit))
_query_params.append(("page", page))
if detail is not None:
_query_params.append(("detail", detail))
return _query_params
def make_advanced_search_ut(query: str = None, wos_ids: List = None, limit: int = 50, col_name: str = "WOS") -> Dict[
str, Any]:
if query is None:
if wos_ids is None:
raise ValueError('query 和 wos_ids 必须满足其中一个不为None')
query = ' OR '.join([f'UT=({wos_id})' for wos_id in wos_ids])
# 通过一个自定义的名字去拿核心
product = ColNameMap[col_name]
model = {
"product": product,
"searchMode": "general",
"viewType": "search",
"serviceMode": "summary",
"search": {
"mode": "general",
"database": product,
"query": [
{
"rowText": query
}
],
"sets": [],
"options": {
"lemmatize": "On"
}
},
"retrieve": {
"count": limit,
"history": True,
"jcr": True,
"sort": "relevance",
"analyzes": getattr(AnalyzesEnum, product).value
},
"eventMode": None,
"isPreprintReview": False
}
return model
def export_search_data_to_txt(
q_id: str,
mark_from: int = 1,
mark_to: int = 500,
col_name: str = "WOS",
filters: str = config.DEFAULT_EXPORT_RECORD_FILTER
) -> Dict[str, Any]:
"""
导出搜索到的记录
:param q_id: 通过检索得到的检索结果id
:param mark_from: 记录开始包含
:param mark_to: 记录结束包含
:param col_name: 来源库/核心
:param filters: fullRecord(完整记录)/fullRecordPlus(完整记录和参考文献)
:return:
"""
if mark_to - mark_from > 500:
mark_to = mark_from + 499
model = {"parentQid": q_id, "sortBy": "relevance",
"displayTimesCited": "true", "displayCitedRefs": "true", "product": "UA", "colName": col_name,
"displayUsageInfo": "true", "fileOpt": "othersoftware", "action": "saveToTab",
"markFrom": str(mark_from), "markTo": str(mark_to),
"view": "summary", "isRefQuery": "false", "locale": "zh_CN", "filters": filters}
return model
def article_detail_model(uts: Union[List[str], str], core: str = "WOSCC"):
"""
详情 https://webofscience.clarivate.cn/wos/woscc/full-record/{wos_id}
接口 https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch
:param uts:
:param core:
:return:
"""
if isinstance(uts, str):
uts = [uts]
model = {
"eventMode": null,
"isPreprintReview": false,
"product": core,
"retrieve": {
"first": 1, "links": "retrieve", "sort": "relevance", "count": 1, "view": "super",
"coll": null, "activity": false, "analyzes": null, "jcr": true, "reviews": true,
"highlight": null,
"secondaryRetrieve": {
"associated_data": {
"sort": "relevance", "count": 10
},
"cited_references": {
"sort": "author-ascending", "count": 30
},
"citing_article": {
"sort": "date", "count": 2, "links": null, "view": "mini"
},
"cited_references_with_context": {
"sort": "date", "count": 135, "view": "mini"
},
"recommendation_articles": {
"sort": "recommendation-relevance", "count": 5, "links": null, "view": "mini"
},
"grants_to_wos_records": {
"sort": "date-descending", "count": 30, "links": null, "view": "mini"
}
}
},
"search": {
"database": core,
"mode": "record_ids",
"uts": uts
},
"searchMode": "record_ids",
"viewType": "search",
"serviceMode": "summary",
}
return model
# 被引用专用model
def get_wos_core_cites(
uts_or_qid: str,
year_range: tuple = None,
core: str = "WOSCC",
parent_db: str = "WOSCC",
is_refine: bool = False
):
"""
https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch
:param uts_or_qid:
:param year_range: 筛选的年份范围
:param core: 检索的数据库
:param parent_db:
:param is_refine: 是否是精炼检索
:return:
"""
model = {
"eventMode": null,
"isPreprintReview": false,
"product": core,
"search": {"database": core, "mode": "citing_article", "parentDatabase": parent_db,
"parentDoc": null,
"parentId": {"type": "colluid", "value": uts_or_qid},
"parentQid": null, "parentSort": null},
# "retrieve": {
# "sort": "date-descending",
# "count": 50,
# "jcr": true,
# "history": true,
# "analyzes": ["TP.Value.6", "REVIEW.Value.6", "EARLY ACCESS.Value.6", "OA.Value.6",
# "DR.Value.6", "ECR.Value.6", "PY.Field_D.6", "DT.Value.6", "AU.Value.6",
# "DX2NG.Value.6", "PEERREVIEW.Value.6"]
# },
"searchMode": "citing_article",
"serviceMode": "summary",
"viewType": "search",
}
refines = []
if year_range:
is_refine = True
years = list(range(*year_range)) + [year_range[-1]]
refines.append(dict(
index="PY", value=[str(year) for year in years]
))
len(refines) and model.update({"refines": refines})
if is_refine:
model.setdefault("qid", uts_or_qid)
model.pop("search")
model.pop("isPreprintReview")
model.update(viewType="refine")
return model
def get_aggregation_wos_cited(q_id: str, core: str = "WOSCC"):
"""
获取各核心引用的聚合
https://webofscience.clarivate.cn/api/wosnx/core/runQueryGetRecordsStream
"""
model = {
"product": core,
"qid": q_id,
"retrieve": {
"analyzes": ["EDN.Value.200"]
},
"searchMode": "citing_article",
"viewType": "records"
}
return model
def get_refine_count(q_id: str, count: int = 5):
model = {
"eventMode": null,
"product": "WOSCC",
"qid": q_id,
"refines": [
{"index": "EDN", "value": ["WOS.SCI", "WOS.SSCI", "WOS.AHCI"]}
],
# "retrieve": {
# "count": count, "sort": "date-descending", "history": true, "jcr": true,
# "analyzes": ["TP.Value.6", "REVIEW.Value.6", "EARLY ACCESS.Value.6", "OA.Value.6",
# "DR.Value.6", "ECR.Value.6", "PY.Field_D.6", "DT.Value.6", "AU.Value.6",
# "DX2NG.Value.6", "PEERREVIEW.Value.6"]
# },
"searchMode": "citing_article",
"serviceMode": "summary",
"viewType": "refine",
}
return model
def get_record_info(body: bytes, sep: Union[str, bytes] = b'\n'):
resp_texts = body.strip().split(sep)
query_id = None
records_found = 0
for resp_text in resp_texts:
resp_row_dict: dict = json.loads(resp_text)
if resp_row_dict.get("key") == "searchInfo":
query_id = resp_row_dict.get("payload", {}).get("QueryID")
records_found = resp_row_dict.get("payload", {}).get("RecordsFound") # 找到的记录
break # 找到就结束
return query_id, records_found
if __name__ == '__main__':
m1 = lite_base_model(WosDB.WOS)

@ -0,0 +1,208 @@
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
from __future__ import annotations
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Tuple, Union, Optional
import scrapy
from scrapy import signals
from itemadapter import ItemAdapter
from pymongo import MongoClient
from itemadapter import ItemAdapter
from pymongo.errors import (
DuplicateKeyError,
BulkWriteError
)
from science_article_wos.items import WosIdRelationItem, WosArticleTodoIdItem, WosCitedNumberItem
from science_article_wos.db_utils.mongo import MongoDBUtils, update_document, build_update_query
if TYPE_CHECKING:
from scrapy.crawler import Crawler
from scrapy.statscollectors import StatsCollector
from pymongo.collection import Collection
mongo_logger = logging.getLogger('pymongo')
mongo_logger.setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
class ScienceArticleWosPipeline:
def process_item(self, item, spider):
return item
class MongoPipeline(MongoDBUtils):
def __init__(self, mongo_uri, mongo_db, stats: StatsCollector):
super().__init__(mongo_uri, mongo_db)
self.stats: StatsCollector = stats
self.insert_failure_update_enable = True
self.duplicate_cover_enable = False # 重复项覆盖
@classmethod
def from_crawler(cls, crawler: Crawler):
m = cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
stats=crawler.stats
)
return m
def open_spider(self, spider):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self, item, spider) -> scrapy.Item:
"""
插入遇到错误不处理
"""
adapter = ItemAdapter(item)
tablename = self._get_item_table(item)
collection = self.db.get_collection(tablename)
d = adapter.asdict()
try:
collection.insert_one(d)
self.stats.inc_value("item2db_inserted/{}".format(tablename))
except DuplicateKeyError as duplicate_error:
self.stats.inc_value("item2db_duplicate/{}".format(tablename))
self.stats.inc_value(f"item_dropped_reasons_count/duplicate")
except Exception:
raise
return item
def process_item_update(self, item, spider) -> scrapy.Item:
"""
插入遇到错误进行更新
"""
adapter = ItemAdapter(item)
tablename = self._get_item_table(item)
collection = self.db.get_collection(tablename)
d = adapter.asdict()
try:
collection.insert_one(d)
self.stats.inc_value("item2db_inserted/{}".format(tablename))
except DuplicateKeyError as duplicate_error:
if self.insert_failure_update_enable:
write_error = duplicate_error.details
filter_query, update_query = self._pick_filter_update(write_error, doc=d)
updated_at_query = None # 删除不确定因素的时间防止影响更新更新除了task_id外的字段不需要处理这个
key_pattern = write_error.get('keyPattern')
key_value = write_error.get('keyValue')
logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value)
# 专门用来适配增量的任务
task_ids = update_query.pop("task_ids", None)
if task_ids:
# task_id一定会引起变动所以先处理
task_id_query = {'task_ids': task_ids}
collection.update_one(filter=filter_query, update=build_update_query(task_id_query, replace=False))
updated_at_query = {"updated_at": update_query.pop('updated_at', None)}
update_q = build_update_query(update_query, replace=self.duplicate_cover_enable)
up_result = collection.update_one(filter=key_value, update=update_q, upsert=True)
if up_result.matched_count == up_result.modified_count == 1:
# 变动了就要修改更新的时间(其实没变也要更新,这样可以知道什么时候动过这条数据)
if updated_at_query:
collection.update_one(filter=key_value, update={"$set": updated_at_query})
self.stats.inc_value("item2db_updated/{}".format(tablename))
except Exception:
raise
return item
@staticmethod
def _pick_filter_update(write_error, doc: dict = None):
original_doc = write_error.get('op', doc) # 插入的数据
key_pattern = write_error.get('keyPattern')
original_doc.pop("_id", None) # 删掉插入失败产生的_id
filter_query = {}
update_query = {key: val for key, val in original_doc.items() if val}
for key in key_pattern.keys():
filter_query.update({key: update_query.pop(key, None)})
return filter_query, update_query
def close_spider(self, spider):
self.client.close()
@staticmethod
def _get_item_table(item) -> str:
"""获取Item类型"""
if hasattr(item, '__tablename__'):
return item.__class__.__tablename__
return 'items_null_table'
class CitedRelation2MongoPipeline(MongoPipeline):
def process_item(self, item, spider):
# 确定Item类型
if isinstance(item, WosCitedNumberItem):
super().process_item_update(item, spider=spider)
return item
class SchoolRelation2MongoPipeline(MongoPipeline):
def process_item(self, item, spider):
# 确定Item类型
if isinstance(item, WosIdRelationItem):
super().process_item_update(item, spider=spider)
return item
class DupTodoPipeline(MongoPipeline):
def process_item(self, item, spider):
if isinstance(item, WosArticleTodoIdItem):
if self.is_exists(item, self._get_dup_key(spider)):
return item
super().process_item(item, spider=spider)
return item
def is_exists(self, item, filter_key) -> bool:
fingerprints = item.get('third_id')
collection: Collection = self.db.get_collection(filter_key)
results = collection.find_one(filter={"third_id": fingerprints},
projection={"_id": 0, "third_id": 1, "exported.da": 1})
if isinstance(results, dict) and results.get('exported') and results.get('third_id') == fingerprints:
self.inc_item_dropped_count("exists")
return True
return False
def _get_dup_key(self, spider):
return 'data_%(source_type)s_article' % {"source_type": spider.source}
def inc_item_dropped_count(self, reason):
self.stats.inc_value("item_dropped_count")
self.stats.inc_value(f"item_dropped_reasons_count/{reason}")
class DupTodoBySciencePipeline(DupTodoPipeline):
dup_collection: Optional[Collection] = None
# def __init__(self, mongo_uri, mongo_db, stats: StatsCollector):
# super().__init__(mongo_uri, mongo_db, stats=stats)
# self.dup_collection = None
def open_spider(self, spider):
super().open_spider(spider)
settings = spider.settings
dup_uri = settings.get("MONGO_URI_SCIENCE")
cli = MongoClient(dup_uri)
dup_db = cli.get_database("MONGO_DATABASE_SCIENCE")
self.dup_collection = dup_db.get_collection('wos_raw_data')
def is_exists(self, item, filter_key) -> bool:
fingerprints = item.get('third_id')
results = self.dup_collection.find_one(filter={"_id": fingerprints},
projection={"_id": 1})
if isinstance(results, dict) and (results.get('_id') == fingerprints):
self.inc_item_dropped_count("exists")
return True
return False

@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
# @Time : 2024/3/5 16:05
# @Author : zhaoxiangpeng
# @File : parse_data.py
import logging
from typing import Union
from science_article_wos.utils.tools import str2int
logger = logging.getLogger(__name__)
DEFAULT_TABLE_HEAD = ['PT', 'AU', 'BA', 'BE', 'GP', 'AF', 'BF', 'CA', 'TI', 'SO', 'SE', 'BS', 'LA', 'DT', 'CT', 'CY', 'CL', 'SP', 'HO', 'DE', 'ID', 'AB', 'C1', 'C3', 'RP', 'EM', 'RI', 'OI', 'FU', 'FP', 'FX', 'CR', 'NR', 'TC', 'Z9', 'U1', 'U2', 'PU', 'PI', 'PA', 'SN', 'EI', 'BN', 'J9', 'JI', 'PD', 'PY', 'VL', 'IS', 'PN', 'SU', 'SI', 'MA', 'BP', 'EP', 'AR', 'DI', 'DL', 'D2', 'EA', 'PG', 'WC', 'WE', 'SC', 'GA', 'PM', 'OA', 'HC', 'HP', 'DA', 'UT']
DEFAULT_TABLE_HEAD_LOWER = [h.lower() for h in DEFAULT_TABLE_HEAD]
def to_dict(data, headers: list):
data_text = data.strip().decode()
_to_dict = {}
for key, value in zip(headers, data_text.split('\t')):
if not value:
value = None
_to_dict[key] = value
vyear = None
str2int(_to_dict.get("py"), None)
try:
vyear = str2int(_to_dict.get("py"), None)
if not vyear:
logger.warning("WOS号: %s,年份异常: %s" % (_to_dict["ut"], _to_dict.get("py")))
except Exception as e:
logger.exception("""
原始数据: %s,
数据字典: %s
异常信息: %s""" % (data, _to_dict, e))
_to_dict["py"] = vyear
return _to_dict
def parse_full_records_txt(content: bytes):
lines = content.strip().split(b'\r\n')
head_line = lines.pop(0)
try:
head_start = head_line.index(b'PT')
head_line = head_line[head_start:]
head_line = head_line.strip().decode('utf-8')
HEADERS = head_line.split('\t')
HEADERS = [s.lower() for s in HEADERS]
except ValueError:
logger.error("内容出现异常跳过: %s" % head_line)
HEADERS = ['PT', 'AU', 'Z2', 'AF', 'BA', 'BF', 'CA', 'GP', 'BE', 'TI', 'Z1', 'SO', 'Z3', 'SE', 'BS', 'LA', 'DT', 'CT', 'CY', 'CL', 'SP', 'HO', 'DE', 'Z5', 'ID', 'AB', 'Z4', 'C1', 'Z6', 'RP', 'EM', 'Z7', 'RI', 'OI', 'FU', 'FX', 'CR', 'NR', 'TC', 'Z9', 'Z8', 'Z9', 'U1', 'U2', 'PU', 'PI', 'PA', 'SN', 'EI', 'BN', 'J9', 'JI', 'PD', 'PY', 'VL', 'IS', 'SI', 'PN', 'SU', 'MA', 'BP', 'EP', 'AR', 'DI', 'D2', 'EA', 'EY', 'PG', 'P2', 'WC', 'SC', 'PM', 'UT', 'OA', 'HP', 'HC', 'DA', 'C3']
HEADERS = [s.lower() for s in HEADERS]
while lines:
line_data = lines.pop(0)
# print(line_data)
standard_data = to_dict(line_data, HEADERS)
# third_id = standard_data.pop('ut', None)
# if not third_id:
# continue
yield standard_data
def parse_full_records(body: Union[bytes, str]):
"""
解析响应的下载内容
"""
if isinstance(body, str):
body = body.encode()
item_g = parse_full_records_txt(body)
for data_dic in item_g:
yield data_dic

@ -0,0 +1,135 @@
# Scrapy settings for science_article_wos project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
# https://docs.scrapy.org/en/latest/topics/settings.html
# https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
BOT_NAME = "science_article_wos"
SPIDER_MODULES = ["science_article_wos.spiders"]
NEWSPIDER_MODULE = "science_article_wos.spiders"
ADDONS = {}
# Crawl responsibly by identifying yourself (and your website) on the user-agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'
# Obey robots.txt rules
ROBOTSTXT_OBEY = False
# Concurrency and throttling settings
#CONCURRENT_REQUESTS = 16
CONCURRENT_REQUESTS_PER_DOMAIN = 1
DOWNLOAD_DELAY = 1
# Disable cookies (enabled by default)
#COOKIES_ENABLED = False
# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False
# Override the default request headers:
#DEFAULT_REQUEST_HEADERS = {
# "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
# "Accept-Language": "en",
#}
# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
#SPIDER_MIDDLEWARES = {
# "science_article_wos.middlewares.ScienceArticleAddSpiderMiddleware": 543,
#}
# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
RETRY_ENABLED = True
RETRY_TIMES = 2 # 重试3次
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 400, 403, 404] # 增加了一些常见的错误码
DOWNLOADER_MIDDLEWARES = {
'scrapy.downloadermiddlewares.retry.RetryMiddleware': 550
# "org_news.middlewares.OrgNewsDownloaderMiddleware": 543,
}
#DOWNLOADER_MIDDLEWARES = {
# "science_article_wos.middlewares.ScienceArticleAddDownloaderMiddleware": 543,
#}
# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
EXTENSIONS = {
# "scrapy.extensions.telnet.TelnetConsole": None,
# "science_article_wos.extensions.ackextension.ACKExtension": 0,
# "science_article_wos.extensions.dingtalk_extension.DingTalkExtension": 0,
}
# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
#ITEM_PIPELINES = {
# "science_article_wos.pipelines.ScienceArticleAddPipeline": 300,
#}
# MONGO_URI = "mongodb://root:123456@192.168.1.211:27017/"
# MONGO_DATABASE = "science2"
MONGO_URI = "mongodb://science-dev:kcidea1509!%25)(@101.43.239.105:27017/?authSource=science&directConnection=true"
MONGO_DATABASE = 'science2'
MONGO_URI_SCIENCE = "mongodb://root:kcidea1509%21%25%29%28@43.140.203.187:27017/"
MONGO_DATABASE_SCIENCE = 'science'
# REDIS_URL = 'redis://:kcidea1509@192.168.1.211:6379/10'
REDIS_URL = 'redis://:kcidea1509!%)(@43.140.203.187:6379/10'
# mysql配置
MYSQL_HOST = '43.140.203.187'
MYSQL_PORT = 3306
MYSQL_DATABASE = 'science_data_dept'
MYSQL_USER = 'science-data-dept'
MYSQL_PASSWORD = 'datadept1509'
# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
#AUTOTHROTTLE_ENABLED = True
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
#AUTOTHROTTLE_DEBUG = False
# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = "httpcache"
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"
# Set settings whose default value is deprecated to a future-proof value
FEED_EXPORT_ENCODING = "utf-8"
# 钉钉机器人配置
DINGTALK_WEBHOOK_URL = 'https://oapi.dingtalk.com/robot/send?access_token=1252fe1ef63e95ced11ac87a01e9978670e82036a516c558e524f89e11513f9f'
DINGTALK_SECRET = 'SECe77fe7cd6c0dbfcdd9ebe6ba1941ddc376be86ca717e9d68bb177b7eded71091'
# 自定义消息模板(可选)
DINGTALK_START_MESSAGE = "🚀 爬虫启动啦!\n**爬虫**: %(spider_name)s\n**时间**: %(started_time)s"
# DINGTALK_CLOSED_MESSAGE = "✅ 爬虫完成!\n**爬虫**: %(spider_name)s\n**项目数**: %(item_scraped_count)s"
# 启用/禁用特定通知
DINGTALK_ENABLE_START = False
DINGTALK_ENABLE_FINISH = True
DINGTALK_ENABLE_ERROR = True
DINGTALK_CLOSED_MESSAGE = """📊 爬虫完成通知\n
**爬虫名称**: %(spider_name)s\n
**机构名称**: %(org_name)s\n
**任务条件**: %(task_condition)s\n
**任务ID**: %(record_id)s\n
**完成时间**: %(finished_time)s\n
**完成原因**: %(finish_reason)s\n
**采集统计**:\n
- 采集项目: %(item_scraped_count)s
- 请求响应: %(response_count)s
- 错误数量: %(error_count)s \n
**状态**: %(state)s"""

@ -0,0 +1,131 @@
import math
from datetime import datetime
from urllib.parse import urlencode
from copy import deepcopy
import scrapy
from scrapy.http.response.json import JsonResponse
from science_article_wos.items import WosCitedNumberItem, WosIdRelationItem, WosArticleTodoIdItem
from science_article_wos.models import wos_model as model
from science_article_wos.configs import wos as config
from science_article_wos.utils import tools
def calculate_next_page(next_page: int = 1, page_size: int = 100):
return (next_page - 1) * page_size + 1
class WosLatestIncrementSpider(scrapy.Spider):
name = "wos_latest_increment"
# allowed_domains = ["wos-api.clarivate.com"]
# start_urls = ["https://wos-api.clarivate.com/api/woslite"]
custom_settings = dict(
DOWNLOADER_MIDDLEWARES={
"science_article_wos.middlewares.WosStarterApiXkeyDownloaderMiddleware": 500
},
ITEM_PIPELINES={
"science_article_wos.pipelines.CitedRelation2MongoPipeline": 300,
"science_article_wos.pipelines.SchoolRelation2MongoPipeline": 350,
"science_article_wos.pipelines.DupTodoBySciencePipeline": 400,
# "science_article_wos.pipelines.DupTodoPipeline": 400,
},
EXTENSIONS={
"science_article_wos.extensions.ACKExtension": 0,
# "science_article_wos.extensions.dingtalk_extension.DingTalkExtension": 0,
},
LOG_LEVEL="INFO"
)
source = "wos"
def __init__(self, task_obj):
scrapy.Spider.__init__(self)
self.task_obj = task_obj
self.record_id = task_obj['task_id']
self.org_id = self.tolist(task_obj['org_id'])
self.org_name = self.tolist(task_obj['org_name'])
self.query_id = task_obj['query_id']
self.query_content = task_obj['content']
self.query_condition = task_obj['task_condition']
self.first_page = task_obj.get('first_page', 1)
self._records_found = 0
@staticmethod
def tolist(datas) -> list:
if isinstance(datas, (list, tuple, set)):
return list(set(datas))
else:
raise TypeError("不支持的类型:%s" % (type(datas)))
async def start(self):
full_query = self.query_content
if self.query_condition is not None:
full_query = '%(query)s%(condition)s' % {
'query': f'({self.query_content})' if self.query_condition else self.query_content,
'condition': ' ' + self.query_condition if self.query_condition else ''
}
self.logger.info(f'full_query: {full_query}')
meta = dict(q=full_query, page=self.first_page, limit=50, detail="short")
params = model.starter_documents_get(**meta)
enc_params = urlencode(params, doseq=True)
yield scrapy.Request(url=config.WOS_STARTER_DOCUMENT_API + '?' + enc_params,
meta=meta)
async def parse(self, response: JsonResponse, **kwargs):
meta = response.meta
request: scrapy.Request = response.request
task_query_id: int = self.query_id
task_org_id: list = self.org_id
task_record_id: int = self.record_id
if response.status != 200:
self.logger.warning("""
响应异常
状态码: %s
响应内容: %s""" % (response.status, response.text))
req_meta = request.meta
resp_result = response.json()
metadata: dict = resp_result.get("metadata")
current_page = metadata.get("page")
records_found = metadata.get('total')
max_page = req_meta.get("MAX_PAGE")
if req_meta.get("page") == self.first_page:
self.logger.info("""
检索式: %s
检索到结果: %s""" % (req_meta.get("q"), records_found))
self.set_records_found(records_found)
max_page = req_meta["MAX_PAGE"] = math.ceil(records_found / config.WOS_STARTER_PER_PAGE_LIMIT)
batch_time = datetime.now()
hits: list = resp_result.get("hits")
for record in hits:
third_id = record.get("uid")
cited_num = tools.get_list_key(array=record.get("citations"), target="count", condition=("db", "WOS"))
if cited_num:
cited_item = WosCitedNumberItem()
cited_item['third_id'] = third_id
cited_item['cited'] = cited_num
cited_item['updated_at'] = batch_time
yield cited_item
relation_item = WosIdRelationItem()
relation_item['third_id'] = third_id
relation_item['query_ids'] = [task_query_id]
relation_item['school_ids'] = task_org_id
relation_item['task_ids'] = [task_record_id]
relation_item['updated_at'] = batch_time
yield relation_item
yield WosArticleTodoIdItem(**dict(third_id=third_id, state=0))
if current_page < max_page:
meta_copy: dict = deepcopy(req_meta)
meta_copy.update({'page': meta_copy['page'] + 1})
yield scrapy.Request(
config.WOS_STARTER_DOCUMENT_API + '?' + urlencode(model.starter_documents_get(**meta_copy)),
meta=meta_copy)
def set_records_found(self, val):
self._records_found = val
def get_records_found(self) -> int:
return self._records_found

@ -0,0 +1,32 @@
from typing import List, Tuple
from datetime import datetime
def str2int(val, replace=0):
try:
val = int(val)
except ValueError:
val = replace
except TypeError:
val = replace
return val
def get_today_date(fmt: str = "%Y-%m-%d"):
return datetime.today().strftime(fmt)
def get_list_key(array: List[dict], target: str, condition: Tuple[str, str]):
"""
给定一个list [{key: val1, target: val2}, {key: val1, target: val2}]
根据condition(key=val)返回第一个target对应的值
:param target:
:param condition:
:param array:
:return:
"""
n, v = condition
for dic in array:
if dic.get(n) == v:
return dic.get(target)

@ -0,0 +1,11 @@
# Automatically created by: scrapy startproject
#
# For more information about the [deploy] section see:
# https://scrapyd.readthedocs.io/en/latest/deploy.html
[settings]
default = science_article_wos.settings
[deploy]
#url = http://localhost:6800/
project = science_article_wos

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/14 13:59
# @Author : zhaoxiangpeng
# @File : crawl_article_latest.py
import math
from typing import List
import pymysql
from pymysql import cursors
from twisted.internet import defer
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from science_article_wos.spiders.wos_latest_increment import WosLatestIncrementSpider
CREATE_RECORD_SQL = '''insert into task_batch_record (batch_date, query_id, task_condition) VALUES ("%(batch_date)s", %(query_id)s, %(task_condition)s)'''
SELECT_RECORD_SQL = """
SELECT
b.id AS task_id,
q.id AS query_id,
q.content AS content,
b.task_condition AS task_condition,
q.source_type AS source_type,
b.is_done AS is_done
FROM
task_batch_record AS b
JOIN task_search_strategy AS q ON q.id = b.query_id
WHERE
b.is_done = 0
AND q.source_type = 1
LIMIT %(limit)s
"""
def starter_latest_all():
@defer.inlineCallbacks
def f():
client: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306,
database='science_data_dept', user='science-data-dept',
passwd='datadept1509', )
cursor = client.cursor(cursors.DictCursor)
cursor.execute(SELECT_RECORD_SQL % {'limit': 1})
result = cursor.fetchone()
query_id = result['query_id']
cursor.execute('select org_id, org_name from relation_org_query where query_id=%s', (query_id,))
org_results: List[dict] = cursor.fetchall()
result['org_id'] = [org_result['org_id'] for org_result in org_results]
result['org_name'] = [org_result['org_name'] for org_result in org_results]
init_params = result
yield process.crawl(WosLatestIncrementSpider, task_obj=init_params)
process = CrawlerProcess(get_project_settings())
f()
process.start()
process.stop()
def starter():
process = CrawlerProcess(get_project_settings())
process.crawl(WosLatestIncrementSpider)
process.start()
if __name__ == '__main__':
starter_latest_all()
Loading…
Cancel
Save