From 1bf0703dba9b14b315e1deab93e7944fe015cd33 Mon Sep 17 00:00:00 2001 From: zhaoxiangpeng <1943364377@qq.com> Date: Thu, 15 Jan 2026 16:07:18 +0800 Subject: [PATCH] =?UTF-8?q?wos:=E5=A2=9E=E9=87=8F=E9=87=87=E9=9B=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- science_article_wos/requirements.txt | 10 +- .../science_article_wos/configs/__init__.py | 0 .../science_article_wos/configs/wos.py | 96 +++++ .../science_article_wos/db_utils/mongo.py | 92 +++++ .../science_article_wos/extensions.py | 92 +++++ .../science_article_wos/items.py | 58 +++ .../science_article_wos/middlewares.py | 9 + .../science_article_wos/models/__init__.py | 4 + .../science_article_wos/models/wos_model.py | 331 ++++++++++++++++++ .../science_article_wos/pipelines.py | 208 +++++++++++ .../scripts/wos_parse_data.py | 76 ++++ .../science_article_wos/settings.py | 135 +++++++ .../spiders/wos_latest_increment.py | 131 +++++++ .../science_article_wos/utils/tools.py | 32 ++ science_article_wos/scrapy.cfg | 11 + .../starter/crawl_article_latest.py | 65 ++++ 16 files changed, 1349 insertions(+), 1 deletion(-) create mode 100644 science_article_wos/science_article_wos/configs/__init__.py create mode 100644 science_article_wos/science_article_wos/configs/wos.py create mode 100644 science_article_wos/science_article_wos/extensions.py create mode 100644 science_article_wos/science_article_wos/models/__init__.py create mode 100644 science_article_wos/science_article_wos/models/wos_model.py create mode 100644 science_article_wos/science_article_wos/spiders/wos_latest_increment.py create mode 100644 science_article_wos/starter/crawl_article_latest.py diff --git a/science_article_wos/requirements.txt b/science_article_wos/requirements.txt index 21fedbd..33eae14 100644 --- a/science_article_wos/requirements.txt +++ b/science_article_wos/requirements.txt @@ -1,3 +1,11 @@ sqlalchemy~=1.3.24 +requests~=2.32.4 scrapy~=2.13.3 -itemadapter~=0.11.0 \ No newline at end of file +pymongo~=4.13.0 +itemadapter~=0.11.0 +happybase~=1.2.0 +fastapi~=0.116.1 +redis~=6.2.0 +parsel~=1.10.0 +sympy~=1.14.0 +pydantic~=2.0.3 \ No newline at end of file diff --git a/science_article_wos/science_article_wos/configs/__init__.py b/science_article_wos/science_article_wos/configs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/science_article_wos/science_article_wos/configs/wos.py b/science_article_wos/science_article_wos/configs/wos.py new file mode 100644 index 0000000..b3b1ad0 --- /dev/null +++ b/science_article_wos/science_article_wos/configs/wos.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# @Time : 2024/1/16 8:41 +# @Author : zhaoxiangpeng +# @File : config.py + +from datetime import datetime + +# 数据来源名 +SOURCE_NAME = 'wos' + +WOS_SEARCH_API = "https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch" +WOS_DETAIL_LINK = 'https://webofscience.clarivate.cn/wos/woscc/full-record/{wos_id}' +WOS_DETAIL_API = 'https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch' + +WOS_ADVANCED_SEARCH_API = 'https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch' +WOS_EXPORT_FILE_API = 'https://webofscience.clarivate.cn/api/wosnx/indic/export/saveToFile' + +WOS_RECORD_STREAM_API = "https://webofscience.clarivate.cn/api/wosnx/core/runQueryGetRecordsStream" +WOS_REFINE_API = "https://webofscience.clarivate.cn/api/wosnx/core/runQueryRefine" + +# WOS starter api +WOS_STARTER_DOCUMENT_UID_API = "https://api.clarivate.com/apis/wos-starter/v1/documents/{uid}" # Unique Identifier/Accession Number +WOS_STARTER_DOCUMENT_API = "https://api.clarivate.com/apis/wos-starter/v1/documents" +WOS_STARTER_PER_PAGE_LIMIT = 50 # 每页限制的数量 + +# WOS lite api +WOS_LITE_QUERY_FIRST_API = 'https://wos-api.clarivate.com/api/woslite' # 第一个请求,请求后会有一个query的序号 +WOS_LITE_QUERY_API = 'https://wos-api.clarivate.com/api/woslite/query' # 使用序号进行翻页 + +# 发文表 +WOS_ARTICLE_COLLECTION = 'data_{}_article'.format(SOURCE_NAME) +# 被引量集合 +WOS_CITED_NUMBER_COLLECTION = "relation_cited_number_{}".format(SOURCE_NAME) +# 发文关系表 +SCHOOL_RELATION_COLLECTION = 'relation_school_{}'.format(SOURCE_NAME) +# 参考文献集合 +WOS_REFERENCE_COLLECTION = "relation_reference_{}".format(SOURCE_NAME) +# 待下载Id表 +ARTICLE_TODO_IDS_COLLECTION = "todo_ids_{}".format(SOURCE_NAME) + +# CSCD来源的发文表 +WOS_CSCD_ARTICLE_COLLECTION = 'data_{}_article_{}'.format(SOURCE_NAME, 'cscd') + +# cookie池配置 +# COOKIE_POOL_CONFIG = dict(host=setting.REDIS_HOST, port=6379, db=setting.REDIS_DB, password=setting.REDIS_PASSWORD) +COOKIE_POOL_GROUP = 'cookies_pool:wos:sid*' +COOKIE_POOL_KEY = 'cookies_pool:wos:sid-sjtu' +COOKIE_TTL = 60 * 60 * 4 + +# 下载的单个文件的大小 +BATCH_DOWNLOAD_LIMIT = 500 +# 导出文件时的默认值 +DEFAULT_EXPORT_RECORD_FILTER = "fullRecordPlus" # fullRecordPlus + +# 表头验证配置 +SUCCESS_TABLE_HEAD_START = b'\xef\xbb\xbfPT' +LOST_TABLE_HEAD_START = b'\xef\xbb\xbfnull' +AUTO_TABLE_HEAD_START = b'\xef\xbb\xbfPT\tAU\tBA\tBE\tGP\tAF\tBF\tCA\tTI\tSO\tSE\tBS\tLA\tDT\tCT\tCY\tCL\tSP\tHO\tDE\tID\tAB\tC1\tC3\tRP\tEM\tRI\tOI\tFU\tFP\tFX\tCR\tNR\tTC\tZ9\tU1\tU2\tPU\tPI\tPA\tSN\tEI\tBN\tJ9\tJI\tPD\tPY\tVL\tIS\tPN\tSU\tSI\tMA\tBP\tEP\tAR\tDI\tDL\tD2\tEA\tPG\tWC\tWE\tSC\tGA\tPM\tOA\tHC\tHP\tDA\tUT\r\n' + + +CORE_NAME_TABLE = dict( + WOSCC="Web of Science Core Collection", + BCI="BIOSIS Citation Index", + SCIELO="SciELO Citation Index", + RSCI="Russian Science Citation Index", + CSCD="Chinese Science Citation Database℠", + ARCI="Arabic Citation Index", + DIIDW="Derwent Innovations Index", + PPRN="", + PQDT="ProQuest ™ Dissertations & Theses Citation Index" +) +NAV_NAME_TABLE = dict( + SCI="Science Citation Index Expanded (SCI-Expanded)", + ESCI="Emerging Sources Citation Index (ESCI)", + SSCI="Social Sciences Citation Index (SSCI)", + ISTP="Conference Proceedings Citation Index – Science (CPCI-S)", + BSCI="Book Citation Index – Science (BKCI-S)", + AHCI="Arts & Humanities Citation Index (A&HCI)", + IC="Index Chemicus (IC)", + ISSHP="Conference Proceedings Citation Index – Social Sciences & Humanities (CPCI-SSH)" +) + +TASK_CONFIG = { + "school_id": 83, + "school_name": "北京林业大学", + "search_policy": """OG=(Beijing Forestry University)""", + "crawl_year": [2021, 2022, 2023], + "source_type": 1, + "priority": 10, + "is_important": 1, + "update_interval": 60 * 60 * 24 * 14, + "create_time": datetime.now(), + "last_time": datetime.now(), + "next_time": datetime.now(), + "state": 0 +} diff --git a/science_article_wos/science_article_wos/db_utils/mongo.py b/science_article_wos/science_article_wos/db_utils/mongo.py index e69de29..1fc5f7b 100644 --- a/science_article_wos/science_article_wos/db_utils/mongo.py +++ b/science_article_wos/science_article_wos/db_utils/mongo.py @@ -0,0 +1,92 @@ +from __future__ import annotations +import logging +from typing import TYPE_CHECKING, Optional, Dict, Tuple +from pymongo import MongoClient +from pymongo import UpdateOne +from pymongo.errors import DuplicateKeyError, BulkWriteError + +if TYPE_CHECKING: + from pymongo.database import Database + from pymongo.collection import Collection + from pymongo.results import InsertOneResult, InsertManyResult, BulkWriteResult + + +def build_update_query(update_data: dict, replace: bool = True) -> dict: + """ + 如果replace为True,则直接覆盖原有的document + """ + update_query = {} + if not update_data: + return {} + for key, val in update_data.items(): + if replace: + update_query.setdefault( + "$set", {} + ).update( + {key: val} + ) + else: + if isinstance(val, list): + update_query.setdefault( + "$addToSet", {} + ).update({ + key: {"$each": val} + }) + else: + update_query.setdefault( + "$set", {} + ).update( + {key: val} + ) + return update_query + + +def update_document(filter_query: dict = None, update_data: dict = None, replace: bool = True) -> Tuple[dict, dict]: + update_query = {} + if not update_data: + return {}, {} + + for key, val in update_data.items(): + if replace: + update_query.setdefault( + "$set", {} + ).update( + {key: val} + ) + else: + if isinstance(val, list): + update_query.setdefault( + "$addToSet", {} + ).update({ + key: {"$each": val} + }) + else: + update_query.setdefault( + "$set", {} + ).update( + {key: val} + ) + return filter_query, update_query + + +class MongoDBUtils: + def __init__(self, mongo_uri, mongo_db): + self.mongo_uri = mongo_uri + self.mongo_db = mongo_db + self.client: MongoClient = None + self.db: Database = None + + def insert2db(self, items, tablename, **kwargs) -> InsertOneResult: + collection: Collection = self.db.get_collection(tablename) + result: InsertOneResult = collection.insert_one(items, **kwargs) + return result + + def _insert2db(self, items, tablename, ordered: bool = False, **kwargs) -> InsertManyResult: + collection: Collection = self.db.get_collection(tablename) + result: InsertManyResult = collection.insert_many(items, ordered=ordered, **kwargs) + return result + + def _update2db(self, items, tablename, ordered: bool = False, **kwargs) -> BulkWriteResult: + collection: Collection = self.db.get_collection(tablename) + bulk_results: BulkWriteResult = collection.bulk_write(items, ordered=ordered, **kwargs) + return bulk_results diff --git a/science_article_wos/science_article_wos/extensions.py b/science_article_wos/science_article_wos/extensions.py new file mode 100644 index 0000000..836f182 --- /dev/null +++ b/science_article_wos/science_article_wos/extensions.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +# @Time : 2026/1/14 16:17 +# @Author : zhaoxiangpeng +# @File : extensions.py +import logging +import pymysql +from scrapy import signals +from scrapy.crawler import Crawler + +logger = logging.getLogger(__name__) + + +class LatestSpiderProtocol: + name: str + record_id: int + org_id: int + org_name: str + query_id: int + query_content: str + + def get_records_found(self) -> int: ... + + +class ACKExtension: + def __init__(self, crawler: Crawler): + self.crawler = crawler + self.change_state_sql = 'update task_batch_record set %(update_kws)s where %(update_cond)s' + + @classmethod + def from_crawler(cls, crawler): + ext = cls(crawler=crawler) + crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) + crawler.signals.connect(ext.spider_error, signal=signals.spider_error) + crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) + return ext + + def spider_opened(self, spider): + kws = { + 'is_done': 2, + } + sql = self.change_state_sql % { + 'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]), + 'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id} + } + self._execute_sql(sql) + + def spider_closed(self, spider: LatestSpiderProtocol): + """ + # 修改任务状态 + # 通知 + """ + kws = { + 'is_done': 1, + 'result_count': spider.get_records_found(), + 'updated_time': 'CURRENT_TIMESTAMP' + } + sql = self.change_state_sql % { + 'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]), + 'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id} + } + self._execute_sql(sql) + + def spider_error(self, spider: LatestSpiderProtocol): + kws = { + 'is_done': -1, + 'updated_time': 'CURRENT_TIMESTAMP' + } + sql = self.change_state_sql % { + 'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]), + 'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id} + } + self._execute_sql(sql) + + def _execute_sql(self, sql): + settings = self.crawler.settings + client = pymysql.connect( + host=settings.get('MYSQL_HOST'), + port=settings.get('MYSQL_PORT', 3306), + database=settings.get('MYSQL_DATABASE'), + user=settings.get('MYSQL_USER'), + passwd=settings.get('MYSQL_PASSWORD'), + ) + try: + cursor = client.cursor() + cursor.execute(sql) + cursor.connection.commit() + logger.info(f'Execute SQL: {sql}') + except Exception as e: + logger.exception(e) + finally: + client.close() + diff --git a/science_article_wos/science_article_wos/items.py b/science_article_wos/science_article_wos/items.py index e69de29..1adfdbb 100644 --- a/science_article_wos/science_article_wos/items.py +++ b/science_article_wos/science_article_wos/items.py @@ -0,0 +1,58 @@ +# Define here the models for your scraped items +# +# See documentation in: +# https://docs.scrapy.org/en/latest/topics/items.html + +import scrapy + + +class ScienceArticleWosItem(scrapy.Item): + # define the fields for your item here like: + # name = scrapy.Field() + pass + + +class AddItemBase(scrapy.Item): + third_id = scrapy.Field() + updated_at = scrapy.Field() + + +class ArticleItem(AddItemBase): + exported = scrapy.Field() + + +class ArticleCitedItem(AddItemBase): + cited = scrapy.Field() + + +class WosArticleItem(ArticleItem): + """wos文章item""" + __tablename__ = 'data_wos_article' + + third_id = scrapy.Field() + exported = scrapy.Field() + updated_at = scrapy.Field() + + +class WosIdRelationItem(AddItemBase): + __tablename__ = 'relation_school_wos' + + query_ids = scrapy.Field() + school_ids = scrapy.Field() + task_ids = scrapy.Field() + + +class WosArticleTodoIdItem(scrapy.Item): + __tablename__ = 'todo_ids_wos' + + third_id = scrapy.Field() + state = scrapy.Field() + + +class WosCitedNumberItem(ArticleCitedItem): + __tablename__ = 'relation_cited_number_wos' + + """发文被引量item""" + third_id = scrapy.Field() + cited = scrapy.Field() + updated_at = scrapy.Field() diff --git a/science_article_wos/science_article_wos/middlewares.py b/science_article_wos/science_article_wos/middlewares.py index c5516bb..4704df6 100644 --- a/science_article_wos/science_article_wos/middlewares.py +++ b/science_article_wos/science_article_wos/middlewares.py @@ -108,6 +108,15 @@ class ScienceArticleWosDownloaderMiddleware: spider.logger.info("Spider opened: %s" % spider.name) +class WosStarterApiXkeyDownloaderMiddleware: + async def process_request(self, request, spider): + key_param = { + 'X-ApiKey': '53b8164e7543ccebe489988287e8b871bc2c0880' + } + request.headers.update(key_param) + # return request + + class WosCookieMiddleware: def __init__(self, redis_uri: str): self.redis_cli = redis.from_url(redis_uri, decode_responses=True) diff --git a/science_article_wos/science_article_wos/models/__init__.py b/science_article_wos/science_article_wos/models/__init__.py new file mode 100644 index 0000000..1209206 --- /dev/null +++ b/science_article_wos/science_article_wos/models/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# @Time : 2026/1/14 14:20 +# @Author : zhaoxiangpeng +# @File : __init__.py.py diff --git a/science_article_wos/science_article_wos/models/wos_model.py b/science_article_wos/science_article_wos/models/wos_model.py new file mode 100644 index 0000000..1ebc248 --- /dev/null +++ b/science_article_wos/science_article_wos/models/wos_model.py @@ -0,0 +1,331 @@ +# -*- coding: utf-8 -*- +# @Time : 2023/7/13 9:40 +# @Author : zhaoxiangpeng +# @File : model.py + +import json +import enum +import warnings +from typing import List, Tuple, Any, Dict, Union +from urllib.parse import urlencode + +from science_article_wos.configs import wos as config + +false = False +true = True +null = None + + +class WosDB(enum.Enum): + WOS = 1 + CSCD = 2 + + +class AnalyzesEnum(enum.Enum): + WOSCC = ["TP.Value.6", "REVIEW.Value.6", "EARLY ACCESS.Value.6", "OA.Value.6", "DR.Value.6", "ECR.Value.6", + "PY.Field_D.6", "DT.Value.6", "AU.Value.6", "DX2NG.Value.6", "PEERREVIEW.Value.6"] + CSCD = ["TP.Value.6", "DR.Value.6", "OA.Value.6", "PY.Field_D.6", "DT.Value.6", "SJ.Value.6", "AU.Value.6", + "OG.Value.6", "SO.Value.6"] + + +ColNameMap = dict(WOS='WOSCC', CSCD='CSCD') + + +def calculate_next_page(next_page: int = 1, page_size: int = 100): + """ + 计算下一页的游标,即记录的序号 + :param next_page: 下一页的页码 + :param page_size: 每页的大小 + :return: + """ + return (next_page - 1) * page_size + 1 + + +def lite_base_model(usr_query: str, db_id: int = None, first_record: int = 1, page_size: int = 100, **kwargs): + if db_id is None: + db_id = 1 + if first_record > 1e5: + warnings.warn('first_record 必须在 1 ~ 100000 之间') + model = { + 'databaseId': WosDB(db_id).name, + 'firstRecord': first_record, + 'count': page_size, + 'usrQuery': usr_query + } + # return urlencode(model) + return model + + +def lite_query_model(db_id: int = None, first_record: int = 1, page_size: int = 100, **kwargs): + if db_id is None: + db_id = 1 + model = { + 'databaseId': WosDB(db_id).name, + 'firstRecord': first_record, + 'count': page_size, + } + return urlencode(model) + + +def starter_documents_uid_get(uid, detail: str = None): + """ + + :param uid: + :param detail: + :return: + """ + _query_params: List[Tuple[str, str]] = [] + if detail is not None: + _query_params.append(("detail", detail)) + + +def starter_documents_get(q, db: WosDB = WosDB.WOS.name, limit: int = config.WOS_STARTER_PER_PAGE_LIMIT, page: int = 1, sort_field: str = None, + modified_time_span=None, tc_modified_time_span=None, detail=None, **kwargs): + """ + :param q: + :param db: + :param limit: 最大为50 + :param page: 当limit为50时,范围为1~2000,也就是最多10w条 + :param sort_field: + :param modified_time_span: + :param tc_modified_time_span: + :param detail: 默认全部数据,如果值为short,返回较少的字段(uid, links{record,citingArticles,references,related}, citations[{db,count}], identifiers{doi,issn}) + :param kwargs: + :return: + """ + _query_params: List[Tuple[str, str]] = [] + _query_params.append(("q", q)) + if db: pass + _query_params.append(("db", db)) + _query_params.append(("limit", limit)) + _query_params.append(("page", page)) + if detail is not None: + _query_params.append(("detail", detail)) + return _query_params + + +def make_advanced_search_ut(query: str = None, wos_ids: List = None, limit: int = 50, col_name: str = "WOS") -> Dict[ + str, Any]: + if query is None: + if wos_ids is None: + raise ValueError('query 和 wos_ids 必须满足其中一个不为None') + query = ' OR '.join([f'UT=({wos_id})' for wos_id in wos_ids]) + # 通过一个自定义的名字去拿核心 + product = ColNameMap[col_name] + model = { + "product": product, + "searchMode": "general", + "viewType": "search", + "serviceMode": "summary", + "search": { + "mode": "general", + "database": product, + "query": [ + { + "rowText": query + } + ], + "sets": [], + "options": { + "lemmatize": "On" + } + }, + "retrieve": { + "count": limit, + "history": True, + "jcr": True, + "sort": "relevance", + "analyzes": getattr(AnalyzesEnum, product).value + }, + "eventMode": None, + "isPreprintReview": False + } + return model + + +def export_search_data_to_txt( + q_id: str, + mark_from: int = 1, + mark_to: int = 500, + col_name: str = "WOS", + filters: str = config.DEFAULT_EXPORT_RECORD_FILTER +) -> Dict[str, Any]: + """ + 导出搜索到的记录 + :param q_id: 通过检索得到的检索结果id + :param mark_from: 记录开始,包含 + :param mark_to: 记录结束,包含 + :param col_name: 来源库/核心 + :param filters: fullRecord(完整记录)/fullRecordPlus(完整记录和参考文献) + :return: + """ + if mark_to - mark_from > 500: + mark_to = mark_from + 499 + model = {"parentQid": q_id, "sortBy": "relevance", + "displayTimesCited": "true", "displayCitedRefs": "true", "product": "UA", "colName": col_name, + "displayUsageInfo": "true", "fileOpt": "othersoftware", "action": "saveToTab", + "markFrom": str(mark_from), "markTo": str(mark_to), + "view": "summary", "isRefQuery": "false", "locale": "zh_CN", "filters": filters} + return model + + +def article_detail_model(uts: Union[List[str], str], core: str = "WOSCC"): + """ + 详情 https://webofscience.clarivate.cn/wos/woscc/full-record/{wos_id} + 接口 https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch + :param uts: + :param core: + :return: + """ + if isinstance(uts, str): + uts = [uts] + model = { + "eventMode": null, + "isPreprintReview": false, + "product": core, + "retrieve": { + "first": 1, "links": "retrieve", "sort": "relevance", "count": 1, "view": "super", + "coll": null, "activity": false, "analyzes": null, "jcr": true, "reviews": true, + "highlight": null, + "secondaryRetrieve": { + "associated_data": { + "sort": "relevance", "count": 10 + }, + "cited_references": { + "sort": "author-ascending", "count": 30 + }, + "citing_article": { + "sort": "date", "count": 2, "links": null, "view": "mini" + }, + "cited_references_with_context": { + "sort": "date", "count": 135, "view": "mini" + }, + "recommendation_articles": { + "sort": "recommendation-relevance", "count": 5, "links": null, "view": "mini" + }, + "grants_to_wos_records": { + "sort": "date-descending", "count": 30, "links": null, "view": "mini" + } + } + }, + "search": { + "database": core, + "mode": "record_ids", + "uts": uts + }, + "searchMode": "record_ids", + "viewType": "search", + "serviceMode": "summary", + } + return model + + +# 被引用专用model +def get_wos_core_cites( + uts_or_qid: str, + year_range: tuple = None, + core: str = "WOSCC", + parent_db: str = "WOSCC", + is_refine: bool = False +): + """ + https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch + :param uts_or_qid: + :param year_range: 筛选的年份范围 + :param core: 检索的数据库 + :param parent_db: + :param is_refine: 是否是精炼检索 + :return: + """ + model = { + "eventMode": null, + "isPreprintReview": false, + "product": core, + + "search": {"database": core, "mode": "citing_article", "parentDatabase": parent_db, + "parentDoc": null, + "parentId": {"type": "colluid", "value": uts_or_qid}, + "parentQid": null, "parentSort": null}, + # "retrieve": { + # "sort": "date-descending", + # "count": 50, + # "jcr": true, + # "history": true, + # "analyzes": ["TP.Value.6", "REVIEW.Value.6", "EARLY ACCESS.Value.6", "OA.Value.6", + # "DR.Value.6", "ECR.Value.6", "PY.Field_D.6", "DT.Value.6", "AU.Value.6", + # "DX2NG.Value.6", "PEERREVIEW.Value.6"] + # }, + + "searchMode": "citing_article", + "serviceMode": "summary", + "viewType": "search", + } + refines = [] + if year_range: + is_refine = True + years = list(range(*year_range)) + [year_range[-1]] + refines.append(dict( + index="PY", value=[str(year) for year in years] + )) + len(refines) and model.update({"refines": refines}) + if is_refine: + model.setdefault("qid", uts_or_qid) + model.pop("search") + model.pop("isPreprintReview") + model.update(viewType="refine") + return model + + +def get_aggregation_wos_cited(q_id: str, core: str = "WOSCC"): + """ + 获取各核心引用的聚合 + https://webofscience.clarivate.cn/api/wosnx/core/runQueryGetRecordsStream + """ + model = { + "product": core, + "qid": q_id, + "retrieve": { + "analyzes": ["EDN.Value.200"] + }, + "searchMode": "citing_article", + "viewType": "records" + } + return model + + +def get_refine_count(q_id: str, count: int = 5): + model = { + "eventMode": null, + "product": "WOSCC", + "qid": q_id, + "refines": [ + {"index": "EDN", "value": ["WOS.SCI", "WOS.SSCI", "WOS.AHCI"]} + ], + # "retrieve": { + # "count": count, "sort": "date-descending", "history": true, "jcr": true, + # "analyzes": ["TP.Value.6", "REVIEW.Value.6", "EARLY ACCESS.Value.6", "OA.Value.6", + # "DR.Value.6", "ECR.Value.6", "PY.Field_D.6", "DT.Value.6", "AU.Value.6", + # "DX2NG.Value.6", "PEERREVIEW.Value.6"] + # }, + "searchMode": "citing_article", + "serviceMode": "summary", + "viewType": "refine", + } + return model + + +def get_record_info(body: bytes, sep: Union[str, bytes] = b'\n'): + resp_texts = body.strip().split(sep) + query_id = None + records_found = 0 + for resp_text in resp_texts: + resp_row_dict: dict = json.loads(resp_text) + if resp_row_dict.get("key") == "searchInfo": + query_id = resp_row_dict.get("payload", {}).get("QueryID") + records_found = resp_row_dict.get("payload", {}).get("RecordsFound") # 找到的记录 + break # 找到就结束 + return query_id, records_found + + +if __name__ == '__main__': + m1 = lite_base_model(WosDB.WOS) diff --git a/science_article_wos/science_article_wos/pipelines.py b/science_article_wos/science_article_wos/pipelines.py index e69de29..5c13a08 100644 --- a/science_article_wos/science_article_wos/pipelines.py +++ b/science_article_wos/science_article_wos/pipelines.py @@ -0,0 +1,208 @@ +# Define your item pipelines here +# +# Don't forget to add your pipeline to the ITEM_PIPELINES setting +# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html + + +# useful for handling different item types with a single interface +from __future__ import annotations +import logging +from datetime import datetime +from typing import TYPE_CHECKING, Tuple, Union, Optional + +import scrapy +from scrapy import signals +from itemadapter import ItemAdapter +from pymongo import MongoClient +from itemadapter import ItemAdapter +from pymongo.errors import ( + DuplicateKeyError, + BulkWriteError +) +from science_article_wos.items import WosIdRelationItem, WosArticleTodoIdItem, WosCitedNumberItem +from science_article_wos.db_utils.mongo import MongoDBUtils, update_document, build_update_query + +if TYPE_CHECKING: + from scrapy.crawler import Crawler + from scrapy.statscollectors import StatsCollector + from pymongo.collection import Collection + +mongo_logger = logging.getLogger('pymongo') +mongo_logger.setLevel(logging.WARNING) +logger = logging.getLogger(__name__) + + +class ScienceArticleWosPipeline: + def process_item(self, item, spider): + return item + + +class MongoPipeline(MongoDBUtils): + def __init__(self, mongo_uri, mongo_db, stats: StatsCollector): + super().__init__(mongo_uri, mongo_db) + self.stats: StatsCollector = stats + self.insert_failure_update_enable = True + self.duplicate_cover_enable = False # 重复项覆盖 + + @classmethod + def from_crawler(cls, crawler: Crawler): + m = cls( + mongo_uri=crawler.settings.get("MONGO_URI"), + mongo_db=crawler.settings.get("MONGO_DATABASE", "items"), + stats=crawler.stats + ) + return m + + def open_spider(self, spider): + self.client = MongoClient(self.mongo_uri) + self.db = self.client[self.mongo_db] + + def process_item(self, item, spider) -> scrapy.Item: + """ + 插入遇到错误不处理 + """ + adapter = ItemAdapter(item) + tablename = self._get_item_table(item) + collection = self.db.get_collection(tablename) + d = adapter.asdict() + try: + collection.insert_one(d) + self.stats.inc_value("item2db_inserted/{}".format(tablename)) + except DuplicateKeyError as duplicate_error: + self.stats.inc_value("item2db_duplicate/{}".format(tablename)) + self.stats.inc_value(f"item_dropped_reasons_count/duplicate") + except Exception: + raise + return item + + def process_item_update(self, item, spider) -> scrapy.Item: + """ + 插入遇到错误进行更新 + """ + adapter = ItemAdapter(item) + tablename = self._get_item_table(item) + collection = self.db.get_collection(tablename) + d = adapter.asdict() + try: + collection.insert_one(d) + self.stats.inc_value("item2db_inserted/{}".format(tablename)) + except DuplicateKeyError as duplicate_error: + if self.insert_failure_update_enable: + write_error = duplicate_error.details + filter_query, update_query = self._pick_filter_update(write_error, doc=d) + updated_at_query = None # 删除不确定因素的时间防止影响更新(更新除了task_id外的字段不需要处理这个) + + key_pattern = write_error.get('keyPattern') + key_value = write_error.get('keyValue') + logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value) + + # 专门用来适配增量的任务 + task_ids = update_query.pop("task_ids", None) + if task_ids: + # task_id一定会引起变动,所以先处理 + task_id_query = {'task_ids': task_ids} + collection.update_one(filter=filter_query, update=build_update_query(task_id_query, replace=False)) + updated_at_query = {"updated_at": update_query.pop('updated_at', None)} + + update_q = build_update_query(update_query, replace=self.duplicate_cover_enable) + up_result = collection.update_one(filter=key_value, update=update_q, upsert=True) + if up_result.matched_count == up_result.modified_count == 1: + # 变动了就要修改更新的时间(其实没变也要更新,这样可以知道什么时候动过这条数据) + if updated_at_query: + collection.update_one(filter=key_value, update={"$set": updated_at_query}) + self.stats.inc_value("item2db_updated/{}".format(tablename)) + except Exception: + raise + + return item + + @staticmethod + def _pick_filter_update(write_error, doc: dict = None): + original_doc = write_error.get('op', doc) # 插入的数据 + key_pattern = write_error.get('keyPattern') + original_doc.pop("_id", None) # 删掉插入失败产生的_id + filter_query = {} + update_query = {key: val for key, val in original_doc.items() if val} + + for key in key_pattern.keys(): + filter_query.update({key: update_query.pop(key, None)}) + return filter_query, update_query + + def close_spider(self, spider): + self.client.close() + + @staticmethod + def _get_item_table(item) -> str: + """获取Item类型""" + if hasattr(item, '__tablename__'): + return item.__class__.__tablename__ + return 'items_null_table' + + +class CitedRelation2MongoPipeline(MongoPipeline): + def process_item(self, item, spider): + # 确定Item类型 + if isinstance(item, WosCitedNumberItem): + super().process_item_update(item, spider=spider) + + return item + + +class SchoolRelation2MongoPipeline(MongoPipeline): + def process_item(self, item, spider): + # 确定Item类型 + if isinstance(item, WosIdRelationItem): + super().process_item_update(item, spider=spider) + + return item + + +class DupTodoPipeline(MongoPipeline): + def process_item(self, item, spider): + if isinstance(item, WosArticleTodoIdItem): + if self.is_exists(item, self._get_dup_key(spider)): + return item + super().process_item(item, spider=spider) + return item + + def is_exists(self, item, filter_key) -> bool: + fingerprints = item.get('third_id') + collection: Collection = self.db.get_collection(filter_key) + results = collection.find_one(filter={"third_id": fingerprints}, + projection={"_id": 0, "third_id": 1, "exported.da": 1}) + if isinstance(results, dict) and results.get('exported') and results.get('third_id') == fingerprints: + self.inc_item_dropped_count("exists") + return True + return False + + def _get_dup_key(self, spider): + return 'data_%(source_type)s_article' % {"source_type": spider.source} + + def inc_item_dropped_count(self, reason): + self.stats.inc_value("item_dropped_count") + self.stats.inc_value(f"item_dropped_reasons_count/{reason}") + + +class DupTodoBySciencePipeline(DupTodoPipeline): + dup_collection: Optional[Collection] = None + + # def __init__(self, mongo_uri, mongo_db, stats: StatsCollector): + # super().__init__(mongo_uri, mongo_db, stats=stats) + # self.dup_collection = None + + def open_spider(self, spider): + super().open_spider(spider) + settings = spider.settings + dup_uri = settings.get("MONGO_URI_SCIENCE") + cli = MongoClient(dup_uri) + dup_db = cli.get_database("MONGO_DATABASE_SCIENCE") + self.dup_collection = dup_db.get_collection('wos_raw_data') + + def is_exists(self, item, filter_key) -> bool: + fingerprints = item.get('third_id') + results = self.dup_collection.find_one(filter={"_id": fingerprints}, + projection={"_id": 1}) + if isinstance(results, dict) and (results.get('_id') == fingerprints): + self.inc_item_dropped_count("exists") + return True + return False diff --git a/science_article_wos/science_article_wos/scripts/wos_parse_data.py b/science_article_wos/science_article_wos/scripts/wos_parse_data.py index e69de29..d5d3843 100644 --- a/science_article_wos/science_article_wos/scripts/wos_parse_data.py +++ b/science_article_wos/science_article_wos/scripts/wos_parse_data.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# @Time : 2024/3/5 16:05 +# @Author : zhaoxiangpeng +# @File : parse_data.py + +import logging +from typing import Union +from science_article_wos.utils.tools import str2int +logger = logging.getLogger(__name__) + + +DEFAULT_TABLE_HEAD = ['PT', 'AU', 'BA', 'BE', 'GP', 'AF', 'BF', 'CA', 'TI', 'SO', 'SE', 'BS', 'LA', 'DT', 'CT', 'CY', 'CL', 'SP', 'HO', 'DE', 'ID', 'AB', 'C1', 'C3', 'RP', 'EM', 'RI', 'OI', 'FU', 'FP', 'FX', 'CR', 'NR', 'TC', 'Z9', 'U1', 'U2', 'PU', 'PI', 'PA', 'SN', 'EI', 'BN', 'J9', 'JI', 'PD', 'PY', 'VL', 'IS', 'PN', 'SU', 'SI', 'MA', 'BP', 'EP', 'AR', 'DI', 'DL', 'D2', 'EA', 'PG', 'WC', 'WE', 'SC', 'GA', 'PM', 'OA', 'HC', 'HP', 'DA', 'UT'] +DEFAULT_TABLE_HEAD_LOWER = [h.lower() for h in DEFAULT_TABLE_HEAD] + + +def to_dict(data, headers: list): + data_text = data.strip().decode() + _to_dict = {} + + for key, value in zip(headers, data_text.split('\t')): + if not value: + value = None + _to_dict[key] = value + + vyear = None + str2int(_to_dict.get("py"), None) + try: + vyear = str2int(_to_dict.get("py"), None) + if not vyear: + logger.warning("WOS号: %s,年份异常: %s" % (_to_dict["ut"], _to_dict.get("py"))) + except Exception as e: + logger.exception(""" + 原始数据: %s, + 数据字典: %s + 异常信息: %s""" % (data, _to_dict, e)) + + _to_dict["py"] = vyear + + return _to_dict + + +def parse_full_records_txt(content: bytes): + lines = content.strip().split(b'\r\n') + head_line = lines.pop(0) + try: + head_start = head_line.index(b'PT') + head_line = head_line[head_start:] + head_line = head_line.strip().decode('utf-8') + HEADERS = head_line.split('\t') + HEADERS = [s.lower() for s in HEADERS] + except ValueError: + logger.error("内容出现异常跳过: %s" % head_line) + HEADERS = ['PT', 'AU', 'Z2', 'AF', 'BA', 'BF', 'CA', 'GP', 'BE', 'TI', 'Z1', 'SO', 'Z3', 'SE', 'BS', 'LA', 'DT', 'CT', 'CY', 'CL', 'SP', 'HO', 'DE', 'Z5', 'ID', 'AB', 'Z4', 'C1', 'Z6', 'RP', 'EM', 'Z7', 'RI', 'OI', 'FU', 'FX', 'CR', 'NR', 'TC', 'Z9', 'Z8', 'Z9', 'U1', 'U2', 'PU', 'PI', 'PA', 'SN', 'EI', 'BN', 'J9', 'JI', 'PD', 'PY', 'VL', 'IS', 'SI', 'PN', 'SU', 'MA', 'BP', 'EP', 'AR', 'DI', 'D2', 'EA', 'EY', 'PG', 'P2', 'WC', 'SC', 'PM', 'UT', 'OA', 'HP', 'HC', 'DA', 'C3'] + HEADERS = [s.lower() for s in HEADERS] + + while lines: + line_data = lines.pop(0) + # print(line_data) + standard_data = to_dict(line_data, HEADERS) + # third_id = standard_data.pop('ut', None) + # if not third_id: + # continue + yield standard_data + + +def parse_full_records(body: Union[bytes, str]): + """ + 解析响应的下载内容 + """ + if isinstance(body, str): + body = body.encode() + item_g = parse_full_records_txt(body) + for data_dic in item_g: + yield data_dic + + diff --git a/science_article_wos/science_article_wos/settings.py b/science_article_wos/science_article_wos/settings.py index e69de29..f3cb79a 100644 --- a/science_article_wos/science_article_wos/settings.py +++ b/science_article_wos/science_article_wos/settings.py @@ -0,0 +1,135 @@ +# Scrapy settings for science_article_wos project +# +# For simplicity, this file contains only settings considered important or +# commonly used. You can find more settings consulting the documentation: +# +# https://docs.scrapy.org/en/latest/topics/settings.html +# https://docs.scrapy.org/en/latest/topics/downloader-middleware.html +# https://docs.scrapy.org/en/latest/topics/spider-middleware.html + +BOT_NAME = "science_article_wos" + +SPIDER_MODULES = ["science_article_wos.spiders"] +NEWSPIDER_MODULE = "science_article_wos.spiders" + +ADDONS = {} + +# Crawl responsibly by identifying yourself (and your website) on the user-agent +USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36' + +# Obey robots.txt rules +ROBOTSTXT_OBEY = False + +# Concurrency and throttling settings +#CONCURRENT_REQUESTS = 16 +CONCURRENT_REQUESTS_PER_DOMAIN = 1 +DOWNLOAD_DELAY = 1 + +# Disable cookies (enabled by default) +#COOKIES_ENABLED = False + +# Disable Telnet Console (enabled by default) +#TELNETCONSOLE_ENABLED = False + +# Override the default request headers: +#DEFAULT_REQUEST_HEADERS = { +# "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", +# "Accept-Language": "en", +#} + +# Enable or disable spider middlewares +# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html +#SPIDER_MIDDLEWARES = { +# "science_article_wos.middlewares.ScienceArticleAddSpiderMiddleware": 543, +#} + +# Enable or disable downloader middlewares +# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html +RETRY_ENABLED = True +RETRY_TIMES = 2 # 重试3次 +RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 400, 403, 404] # 增加了一些常见的错误码 +DOWNLOADER_MIDDLEWARES = { + 'scrapy.downloadermiddlewares.retry.RetryMiddleware': 550 + # "org_news.middlewares.OrgNewsDownloaderMiddleware": 543, +} +#DOWNLOADER_MIDDLEWARES = { +# "science_article_wos.middlewares.ScienceArticleAddDownloaderMiddleware": 543, +#} + +# Enable or disable extensions +# See https://docs.scrapy.org/en/latest/topics/extensions.html +EXTENSIONS = { + # "scrapy.extensions.telnet.TelnetConsole": None, + # "science_article_wos.extensions.ackextension.ACKExtension": 0, + # "science_article_wos.extensions.dingtalk_extension.DingTalkExtension": 0, +} + +# Configure item pipelines +# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html +#ITEM_PIPELINES = { +# "science_article_wos.pipelines.ScienceArticleAddPipeline": 300, +#} +# MONGO_URI = "mongodb://root:123456@192.168.1.211:27017/" +# MONGO_DATABASE = "science2" +MONGO_URI = "mongodb://science-dev:kcidea1509!%25)(@101.43.239.105:27017/?authSource=science&directConnection=true" +MONGO_DATABASE = 'science2' + +MONGO_URI_SCIENCE = "mongodb://root:kcidea1509%21%25%29%28@43.140.203.187:27017/" +MONGO_DATABASE_SCIENCE = 'science' + +# REDIS_URL = 'redis://:kcidea1509@192.168.1.211:6379/10' +REDIS_URL = 'redis://:kcidea1509!%)(@43.140.203.187:6379/10' + +# mysql配置 +MYSQL_HOST = '43.140.203.187' +MYSQL_PORT = 3306 +MYSQL_DATABASE = 'science_data_dept' +MYSQL_USER = 'science-data-dept' +MYSQL_PASSWORD = 'datadept1509' + +# Enable and configure the AutoThrottle extension (disabled by default) +# See https://docs.scrapy.org/en/latest/topics/autothrottle.html +#AUTOTHROTTLE_ENABLED = True +# The initial download delay +#AUTOTHROTTLE_START_DELAY = 5 +# The maximum download delay to be set in case of high latencies +#AUTOTHROTTLE_MAX_DELAY = 60 +# The average number of requests Scrapy should be sending in parallel to +# each remote server +#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 +# Enable showing throttling stats for every response received: +#AUTOTHROTTLE_DEBUG = False + +# Enable and configure HTTP caching (disabled by default) +# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings +#HTTPCACHE_ENABLED = True +#HTTPCACHE_EXPIRATION_SECS = 0 +#HTTPCACHE_DIR = "httpcache" +#HTTPCACHE_IGNORE_HTTP_CODES = [] +#HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage" + +# Set settings whose default value is deprecated to a future-proof value +FEED_EXPORT_ENCODING = "utf-8" + +# 钉钉机器人配置 +DINGTALK_WEBHOOK_URL = 'https://oapi.dingtalk.com/robot/send?access_token=1252fe1ef63e95ced11ac87a01e9978670e82036a516c558e524f89e11513f9f' +DINGTALK_SECRET = 'SECe77fe7cd6c0dbfcdd9ebe6ba1941ddc376be86ca717e9d68bb177b7eded71091' +# 自定义消息模板(可选) +DINGTALK_START_MESSAGE = "🚀 爬虫启动啦!\n**爬虫**: %(spider_name)s\n**时间**: %(started_time)s" +# DINGTALK_CLOSED_MESSAGE = "✅ 爬虫完成!\n**爬虫**: %(spider_name)s\n**项目数**: %(item_scraped_count)s" +# 启用/禁用特定通知 +DINGTALK_ENABLE_START = False +DINGTALK_ENABLE_FINISH = True +DINGTALK_ENABLE_ERROR = True +DINGTALK_CLOSED_MESSAGE = """📊 爬虫完成通知\n +**爬虫名称**: %(spider_name)s\n +**机构名称**: %(org_name)s\n +**任务条件**: %(task_condition)s\n +**任务ID**: %(record_id)s\n +**完成时间**: %(finished_time)s\n +**完成原因**: %(finish_reason)s\n +**采集统计**:\n + - 采集项目: %(item_scraped_count)s 条 + - 请求响应: %(response_count)s 次 + - 错误数量: %(error_count)s 个\n +**状态**: %(state)s""" diff --git a/science_article_wos/science_article_wos/spiders/wos_latest_increment.py b/science_article_wos/science_article_wos/spiders/wos_latest_increment.py new file mode 100644 index 0000000..1ccdaf2 --- /dev/null +++ b/science_article_wos/science_article_wos/spiders/wos_latest_increment.py @@ -0,0 +1,131 @@ +import math +from datetime import datetime +from urllib.parse import urlencode +from copy import deepcopy + +import scrapy +from scrapy.http.response.json import JsonResponse + +from science_article_wos.items import WosCitedNumberItem, WosIdRelationItem, WosArticleTodoIdItem +from science_article_wos.models import wos_model as model +from science_article_wos.configs import wos as config +from science_article_wos.utils import tools + + +def calculate_next_page(next_page: int = 1, page_size: int = 100): + return (next_page - 1) * page_size + 1 + + +class WosLatestIncrementSpider(scrapy.Spider): + name = "wos_latest_increment" + # allowed_domains = ["wos-api.clarivate.com"] + # start_urls = ["https://wos-api.clarivate.com/api/woslite"] + custom_settings = dict( + DOWNLOADER_MIDDLEWARES={ + "science_article_wos.middlewares.WosStarterApiXkeyDownloaderMiddleware": 500 + }, + ITEM_PIPELINES={ + "science_article_wos.pipelines.CitedRelation2MongoPipeline": 300, + "science_article_wos.pipelines.SchoolRelation2MongoPipeline": 350, + "science_article_wos.pipelines.DupTodoBySciencePipeline": 400, + # "science_article_wos.pipelines.DupTodoPipeline": 400, + }, + EXTENSIONS={ + "science_article_wos.extensions.ACKExtension": 0, + # "science_article_wos.extensions.dingtalk_extension.DingTalkExtension": 0, + }, + LOG_LEVEL="INFO" + ) + source = "wos" + + def __init__(self, task_obj): + scrapy.Spider.__init__(self) + self.task_obj = task_obj + self.record_id = task_obj['task_id'] + self.org_id = self.tolist(task_obj['org_id']) + self.org_name = self.tolist(task_obj['org_name']) + self.query_id = task_obj['query_id'] + self.query_content = task_obj['content'] + self.query_condition = task_obj['task_condition'] + + self.first_page = task_obj.get('first_page', 1) + self._records_found = 0 + + @staticmethod + def tolist(datas) -> list: + if isinstance(datas, (list, tuple, set)): + return list(set(datas)) + else: + raise TypeError("不支持的类型:%s" % (type(datas))) + + async def start(self): + full_query = self.query_content + if self.query_condition is not None: + full_query = '%(query)s%(condition)s' % { + 'query': f'({self.query_content})' if self.query_condition else self.query_content, + 'condition': ' ' + self.query_condition if self.query_condition else '' + } + self.logger.info(f'full_query: {full_query}') + meta = dict(q=full_query, page=self.first_page, limit=50, detail="short") + params = model.starter_documents_get(**meta) + enc_params = urlencode(params, doseq=True) + yield scrapy.Request(url=config.WOS_STARTER_DOCUMENT_API + '?' + enc_params, + meta=meta) + + async def parse(self, response: JsonResponse, **kwargs): + meta = response.meta + request: scrapy.Request = response.request + task_query_id: int = self.query_id + task_org_id: list = self.org_id + task_record_id: int = self.record_id + + if response.status != 200: + self.logger.warning(""" + 响应异常 + 状态码: %s + 响应内容: %s""" % (response.status, response.text)) + req_meta = request.meta + resp_result = response.json() + metadata: dict = resp_result.get("metadata") + current_page = metadata.get("page") + records_found = metadata.get('total') + + max_page = req_meta.get("MAX_PAGE") + if req_meta.get("page") == self.first_page: + self.logger.info(""" + 检索式: %s + 检索到结果: %s""" % (req_meta.get("q"), records_found)) + self.set_records_found(records_found) + max_page = req_meta["MAX_PAGE"] = math.ceil(records_found / config.WOS_STARTER_PER_PAGE_LIMIT) + batch_time = datetime.now() + hits: list = resp_result.get("hits") + for record in hits: + third_id = record.get("uid") + cited_num = tools.get_list_key(array=record.get("citations"), target="count", condition=("db", "WOS")) + if cited_num: + cited_item = WosCitedNumberItem() + cited_item['third_id'] = third_id + cited_item['cited'] = cited_num + cited_item['updated_at'] = batch_time + yield cited_item + relation_item = WosIdRelationItem() + relation_item['third_id'] = third_id + relation_item['query_ids'] = [task_query_id] + relation_item['school_ids'] = task_org_id + relation_item['task_ids'] = [task_record_id] + relation_item['updated_at'] = batch_time + yield relation_item + yield WosArticleTodoIdItem(**dict(third_id=third_id, state=0)) + + if current_page < max_page: + meta_copy: dict = deepcopy(req_meta) + meta_copy.update({'page': meta_copy['page'] + 1}) + yield scrapy.Request( + config.WOS_STARTER_DOCUMENT_API + '?' + urlencode(model.starter_documents_get(**meta_copy)), + meta=meta_copy) + + def set_records_found(self, val): + self._records_found = val + + def get_records_found(self) -> int: + return self._records_found diff --git a/science_article_wos/science_article_wos/utils/tools.py b/science_article_wos/science_article_wos/utils/tools.py index e69de29..850ee7f 100644 --- a/science_article_wos/science_article_wos/utils/tools.py +++ b/science_article_wos/science_article_wos/utils/tools.py @@ -0,0 +1,32 @@ +from typing import List, Tuple +from datetime import datetime + + +def str2int(val, replace=0): + try: + val = int(val) + except ValueError: + val = replace + except TypeError: + val = replace + return val + + +def get_today_date(fmt: str = "%Y-%m-%d"): + return datetime.today().strftime(fmt) + + +def get_list_key(array: List[dict], target: str, condition: Tuple[str, str]): + """ + 给定一个list [{key: val1, target: val2}, {key: val1, target: val2}] + 根据condition(key=val)返回第一个target对应的值 + :param target: + :param condition: + :param array: + :return: + """ + n, v = condition + for dic in array: + if dic.get(n) == v: + return dic.get(target) + diff --git a/science_article_wos/scrapy.cfg b/science_article_wos/scrapy.cfg index e69de29..2900109 100644 --- a/science_article_wos/scrapy.cfg +++ b/science_article_wos/scrapy.cfg @@ -0,0 +1,11 @@ +# Automatically created by: scrapy startproject +# +# For more information about the [deploy] section see: +# https://scrapyd.readthedocs.io/en/latest/deploy.html + +[settings] +default = science_article_wos.settings + +[deploy] +#url = http://localhost:6800/ +project = science_article_wos diff --git a/science_article_wos/starter/crawl_article_latest.py b/science_article_wos/starter/crawl_article_latest.py new file mode 100644 index 0000000..930b2c9 --- /dev/null +++ b/science_article_wos/starter/crawl_article_latest.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# @Time : 2026/1/14 13:59 +# @Author : zhaoxiangpeng +# @File : crawl_article_latest.py + +import math +from typing import List +import pymysql +from pymysql import cursors +from twisted.internet import defer +from scrapy.crawler import CrawlerProcess +from scrapy.utils.project import get_project_settings +from science_article_wos.spiders.wos_latest_increment import WosLatestIncrementSpider + +CREATE_RECORD_SQL = '''insert into task_batch_record (batch_date, query_id, task_condition) VALUES ("%(batch_date)s", %(query_id)s, %(task_condition)s)''' +SELECT_RECORD_SQL = """ +SELECT + b.id AS task_id, + q.id AS query_id, + q.content AS content, + b.task_condition AS task_condition, + q.source_type AS source_type, + b.is_done AS is_done +FROM + task_batch_record AS b + JOIN task_search_strategy AS q ON q.id = b.query_id +WHERE + b.is_done = 0 + AND q.source_type = 1 + LIMIT %(limit)s +""" + + +def starter_latest_all(): + @defer.inlineCallbacks + def f(): + client: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306, + database='science_data_dept', user='science-data-dept', + passwd='datadept1509', ) + cursor = client.cursor(cursors.DictCursor) + cursor.execute(SELECT_RECORD_SQL % {'limit': 1}) + result = cursor.fetchone() + query_id = result['query_id'] + cursor.execute('select org_id, org_name from relation_org_query where query_id=%s', (query_id,)) + org_results: List[dict] = cursor.fetchall() + result['org_id'] = [org_result['org_id'] for org_result in org_results] + result['org_name'] = [org_result['org_name'] for org_result in org_results] + + init_params = result + yield process.crawl(WosLatestIncrementSpider, task_obj=init_params) + + process = CrawlerProcess(get_project_settings()) + f() + process.start() + process.stop() + + +def starter(): + process = CrawlerProcess(get_project_settings()) + process.crawl(WosLatestIncrementSpider) + process.start() + + +if __name__ == '__main__': + starter_latest_all()