From abdad5b786243d404bd05673301f913dd623b32d Mon Sep 17 00:00:00 2001 From: zhaoxiangpeng <1943364377@qq.com> Date: Mon, 12 Jan 2026 10:47:51 +0800 Subject: [PATCH] =?UTF-8?q?add:=E6=90=81=E7=BD=AE=E6=9B=B4=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../browser/wos_search_export.py | 296 ++++++++++++++ .../science_article_add/configs/wos_dp.py | 0 .../db_utils/buffer_component.py | 0 .../science_article_add/db_utils/mongo.py | 30 ++ .../science_article_add/items/__init__.py | 19 + .../middlewares/__init__.py | 0 .../middlewares/lifecycle_middleware.py | 0 .../science_article_add/middlewares/wos.py | 97 +++++ .../science_article_add/pipelines/mongo.py | 57 ++- .../pipelines/verify_data.py | 13 + .../science_article_add/pipelines/wos.py | 15 +- .../scripts/get_db_task.py | 32 +- .../scripts/todo_id_manager.py | 0 .../spiders/download_by_qid.py | 95 +++++ .../spiders/wos_latest_increment.py | 4 + .../science_article_add/utils/dingtalk.py | 378 ++++++++++++++++++ .../science_article_add/utils/get_cookie.py | 0 .../science_article_add/utils/get_self_ip.py | 0 18 files changed, 1017 insertions(+), 19 deletions(-) create mode 100644 science_article_add/science_article_add/browser/wos_search_export.py create mode 100644 science_article_add/science_article_add/configs/wos_dp.py create mode 100644 science_article_add/science_article_add/db_utils/buffer_component.py create mode 100644 science_article_add/science_article_add/middlewares/__init__.py create mode 100644 science_article_add/science_article_add/middlewares/lifecycle_middleware.py create mode 100644 science_article_add/science_article_add/middlewares/wos.py create mode 100644 science_article_add/science_article_add/scripts/todo_id_manager.py create mode 100644 science_article_add/science_article_add/spiders/download_by_qid.py create mode 100644 science_article_add/science_article_add/utils/dingtalk.py create mode 100644 science_article_add/science_article_add/utils/get_cookie.py create mode 100644 science_article_add/science_article_add/utils/get_self_ip.py diff --git a/science_article_add/science_article_add/browser/wos_search_export.py b/science_article_add/science_article_add/browser/wos_search_export.py new file mode 100644 index 0000000..4b6ddc9 --- /dev/null +++ b/science_article_add/science_article_add/browser/wos_search_export.py @@ -0,0 +1,296 @@ +# -*- coding: utf-8 -*- +# @Time : 2025/11/24 09:25 +# @Author : zhaoxiangpeng +# @File : wos_search_export.py +import math +import json +import logging +from typing import Any +from datetime import datetime + +import redis +from DrissionPage import Chromium +from DrissionPage import ChromiumPage, ChromiumOptions +from DrissionPage._pages.chromium_tab import ChromiumTab +from DrissionPage._units.listener import DataPacket, Response +from DrissionPage.errors import ElementNotFoundError + +from science_article_add.utils import tools +from science_article_add.scripts.wos_parse_data import parse_full_records_txt + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +LINK = "https://webofscience.clarivate.cn/wos/woscc/advanced-search" +BATCH_DOWNLOAD_LIMIT = 500 + + +class Settings: + env = "dev" + SEARCH_ROUTE = '/api/wosnx/core/runQuerySearch' + EXPORT_ROUTE = '/api/wosnx/indic/export/saveToFile' + DB_CHANGE_ELE = '//*[@id="global-select"]/div/div[@aria-label="Select database"]/div[@title="Web of Science Core Collection"]' + QUERY_INPUT_ELE = '//*[@id="advancedSearchInputArea"]' + SEARCH_BUTTON_ELE = '//button[@data-ta="run-search"]/span[@class="mat-mdc-button-touch-target"]' + + EXPORT_BUTTON_ELE = '//*[@id="export-trigger-btn"]' + TABWIN_BUTTON_ELE = '//*[@id="exportToTabWinButton"]' # 制表符分割文件button + + RECORD_TYPE_SELECT_ELE = '//div[@class="ng-star-inserted"]/wos-select/button[@aria-haspopup="listbox"]' # 记录内容选择框 + FULL_RECORD_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="Full Record"]' # 完整记录 + FULL_RECORD_REFERENCE_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="Full Record and Cited References"]' # 全记录与参考文献 + + RECORD_RANGE_ELE = '//*[@id="radio3-input"]' # 记录范围 + RECORD_EXPORT_START_ELE = '//input[@name="markFrom"]' + RECORD_EXPORT_END_ELE = '//input[@name="markTo"]' + + EXPORT_FILE_ELE = '//*[@id="exportButton"]' + + INPUT_CONTENT = '(OG=(Anhui University of Science & Technology)) AND PY=(2025)' + + +class ProSettings(Settings): + DB_CHANGE = '//*[@id="global-select"]/div/div[@aria-label="Select database"]/div[@title="Web of Science 核心合集"]' + EXPORT_BUTTON_ELE = '//botton[@id="export-trigger-btn"]' + FULL_RECORD_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="完整记录"]' # 完整记录 + FULL_RECORD_REFERENCE_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="全记录与引用的参考文献"]' # 全记录与参考文献 + + +settings = Settings() + + +class WosSearchExport: + _records_found = 0 + inited: bool = False + is_running = False + + def __init__(self, query_content: Any, options=None): + self._records_found = 0 + self._query_id = None + self.query_content = query_content + self.options = options + + @classmethod + def create_instance(cls, config: dict): + return cls( + query_content=config.get("query_content"), + options=config.get('options') + ) + + def set_records_found(self, val): + self._records_found = val + + def get_records_found(self) -> int: + return self._records_found + + def set_query_id(self, query_id): + self._query_id = query_id + + def get_query_id(self): + return self._query_id + + def _initialize(self): + self.browser = Chromium(self.options) + self.tab = self.browser.latest_tab + # 都只需要执行一次 + self.open_url(LINK) + # 处理cookie的首选项 + self.operate_cookie_first() + self.change_db() + self.inited = True + + def open_url(self, url): + logger.debug('Opening url: %s' % url) + self.tab.get(url) + + def operate_cookie_first(self): + # cookie管理处理 + logger.debug('Operating cookie first...') + ck_m_div = self.tab.ele('xpath://*[@id="onetrust-banner-sdk"]') + if ck_m_div: + ele = self.tab.ele('xpath://*[@id="onetrust-accept-btn-handler"]') + ele.click() + + def change_db(self): + logger.info('Changing database...') + default_db_ele = self.tab.ele('xpath://*[@id="snSelectDb"]/button') + c1 = default_db_ele.raw_text + default_db_ele.click() + self.tab.ele( + 'xpath:%(xpath)s' % {"xpath": settings.DB_CHANGE_ELE}).click() + + def input_query(self, content: str, clear_input: bool = True, tab=None): + tab = tab or self.tab + input_area_ele = tab.ele('xpath:%(xpath)s' % {"xpath": settings.QUERY_INPUT_ELE}) + if clear_input: + input_area_ele.clear() # 清空 + + input_area_ele.input(content) # 输入检索内容 + + def listen_func(): + tab.listen.start(settings.SEARCH_ROUTE, method="POST") + + def operation_func(): + search_button_ele = tab.ele('xpath:%(xpath)s' % {"xpath": settings.SEARCH_BUTTON_ELE}) + search_button_ele.click() + + def capture_packet(packet: DataPacket): + search_url = tab.url + record_id, records_found = self.get_record_info(packet.response.body) + self.set_records_found(records_found) + self.set_query_id(record_id) + if not self.get_query_id(): + logger.warning('未找到记录 %s' % packet.response.body) + + if records_found == 0: + logger.warning('检索式 "%s" 找到记录 %s 条' % (self.query_content, records_found)) + return + + else: + logger.info('检索式 "%s" 找到记录 %s 条' % (self.query_content, records_found)) + + return True + + self.intercept(listen=listen_func, operation=operation_func, callback=capture_packet, tab=tab) + + def download_records(self): + for b in self.distribute_page(): + query_id, batch_id, mark_start, mark_end = b + self.rpa_download(mark_start, mark_end, batch=batch_id, tab=self.tab) + + def distribute_page(self): + # 计算页码 + logger.info("prepare downloading...") + records_found = self.get_records_found() + query_id = self.get_query_id() + mark_start = 1 + mark_end = 0 + batch_id = 0 + for i in range(math.ceil(records_found / BATCH_DOWNLOAD_LIMIT)): + mark_end += BATCH_DOWNLOAD_LIMIT + if mark_end > records_found: + mark_end = records_found + batch_id += 1 + yield query_id, batch_id, mark_start, mark_end + + mark_start += BATCH_DOWNLOAD_LIMIT + + def clear_query(self): + pass + + def reflush_query(self): + pass + + def reflush_page(self): + pass + + def rpa_download(self, start: int = 1, end: int = 500, batch: str | int = None, tab=None): + """ + 点击下载前拦截api + """ + try: + logger.debug("download starting...") + tab = tab or self.tab + tab.ele('xpath:%(xpath)s' % {"xpath": settings.EXPORT_BUTTON_ELE}).click() # 点击导出 + tab.ele('xpath:%(xpath)s' % {"xpath": settings.TABWIN_BUTTON_ELE}).click() # 选择制表符分割 + # 等待弹框 + # 切换导出格式选择全记录与参考文献 + tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_TYPE_SELECT_ELE}).click() + tab.ele('xpath:%(xpath)s' % {"xpath": settings.FULL_RECORD_REFERENCE_ELE}).click() + + # 输入记录起止 + tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_RANGE_ELE}).click() # 切换到范围 + tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_EXPORT_START_ELE}).input(start, clear=True) + tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_EXPORT_END_ELE}).input(end, clear=True) + except ElementNotFoundError: + self.reflush_page() + + def listen_func(): + tab.listen.start(settings.EXPORT_ROUTE, method="POST") + + def operation_func(): + # tab.ele('xpath:%(xpath)s' % {"xpath": settings.EXPORT_FILE_ELE}).click() # 点击导出按钮 + tab.ele('xpath:%(xpath)s' % {"xpath": settings.EXPORT_FILE_ELE}).click.to_download( + save_path=DOWNLOAD_PATH, + rename='%s.txt' % batch + ) + + def capture_packet(packet: DataPacket): + g = self._parse_download(packet.response) + for i in g: + print(i) + return True + + self.intercept(listen=listen_func, operation=operation_func, callback=capture_packet, tab=tab) + + def intercept(self, listen, operation, callback, tab=None): + listen() + operation() + for packet in tab.listen.steps(count=3): + print(packet.response.body) + if not self.intercept_verify(packet): + continue + r = callback(packet) + if r: + break + return + + @staticmethod + def intercept_verify(packet: DataPacket): + content = packet.response.body + if isinstance(content, bytes) and content.find(b'"Server.passiveVerificationRequired"') != -1: + return False + else: + return True + + def _parse_download(self, response: Response): + batch_time = datetime.now() + item_g = parse_full_records_txt(response.body.encode()) + parse_count = 0 + for data_dic in item_g: + t_id = data_dic.pop('ut', None) + if t_id: + parse_count += 1 + yield dict(third_id=t_id, exported=data_dic, updated_at=batch_time) + # 解析被引量 + if cited_num := tools.str2int(data_dic.get("tc", 0), 0): + yield dict(third_id=t_id, cited=cited_num, updated_at=batch_time) + + @staticmethod + def get_record_info(body: bytes): + resp_texts = body.strip().split(b'\n') + query_id = None + records_found = 0 + for resp_text in resp_texts: + resp_row_dict: dict = json.loads(resp_text) + if resp_row_dict.get("key") == "searchInfo": + query_id = resp_row_dict.get("payload", {}).get("QueryID") + records_found = resp_row_dict.get("payload", {}).get("RecordsFound") # 找到的记录 + break # 找到就结束 + return query_id, records_found + + def execute(self): + if not self.inited: + logger.info('初始化页面') + self._initialize() + self.input_query(self.query_content) + self.download_records() + + def start(self): + pass + + def stop(self): + self.tab.close() + + +if __name__ == '__main__': + DOWNLOAD_PATH = r'Y:\wos-metadata\wos increment-202512\00' + conf = dict( + query_content="(OG=(Southwest University of Science & Technology - China)) AND PY=(2025)", + download_dir=DOWNLOAD_PATH + ) + co = ChromiumOptions() # .headless() + co.set_pref('download.default_directory', conf['download_dir']) + conf['options'] = co + + ins = WosSearchExport.create_instance(config=conf) + ins.execute() diff --git a/science_article_add/science_article_add/configs/wos_dp.py b/science_article_add/science_article_add/configs/wos_dp.py new file mode 100644 index 0000000..e69de29 diff --git a/science_article_add/science_article_add/db_utils/buffer_component.py b/science_article_add/science_article_add/db_utils/buffer_component.py new file mode 100644 index 0000000..e69de29 diff --git a/science_article_add/science_article_add/db_utils/mongo.py b/science_article_add/science_article_add/db_utils/mongo.py index b94207d..7b81a9e 100644 --- a/science_article_add/science_article_add/db_utils/mongo.py +++ b/science_article_add/science_article_add/db_utils/mongo.py @@ -11,6 +11,36 @@ if TYPE_CHECKING: from pymongo.results import InsertManyResult, BulkWriteResult +def build_update_query(update_data: dict, replace: bool = True) -> dict: + """ + 如果replace为True,则直接覆盖原有的document + """ + update_query = {} + if not update_data: + return {} + for key, val in update_data.items(): + if replace: + update_query.setdefault( + "$set", {} + ).update( + {key: val} + ) + else: + if isinstance(val, list): + update_query.setdefault( + "$addToSet", {} + ).update({ + key: {"$each": val} + }) + else: + update_query.setdefault( + "$set", {} + ).update( + {key: val} + ) + return update_query + + def update_document(filter_query: dict = None, update_data: dict = None, replace: bool = True) -> Tuple[dict, dict]: update_query = {} if not update_data: diff --git a/science_article_add/science_article_add/items/__init__.py b/science_article_add/science_article_add/items/__init__.py index 9ac61e7..f9e821c 100644 --- a/science_article_add/science_article_add/items/__init__.py +++ b/science_article_add/science_article_add/items/__init__.py @@ -13,6 +13,25 @@ class ScienceArticleAddItem(scrapy.Item): updated_at = scrapy.Field() +class AddItemBase(scrapy.Item): + third_id = scrapy.Field() + updated_at = scrapy.Field() + + +class ArticleItem(AddItemBase): + exported = scrapy.Field() + + +class IdRelationItem(AddItemBase): + query_ids = scrapy.Field() + school_ids = scrapy.Field() + task_ids = scrapy.Field() + + +class ArticleCitedItem(AddItemBase): + cited = scrapy.Field() + + class WosLiteAddItem(ScienceArticleAddItem): year = scrapy.Field() query_ids = scrapy.Field() diff --git a/science_article_add/science_article_add/middlewares/__init__.py b/science_article_add/science_article_add/middlewares/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/science_article_add/science_article_add/middlewares/lifecycle_middleware.py b/science_article_add/science_article_add/middlewares/lifecycle_middleware.py new file mode 100644 index 0000000..e69de29 diff --git a/science_article_add/science_article_add/middlewares/wos.py b/science_article_add/science_article_add/middlewares/wos.py new file mode 100644 index 0000000..b5c8692 --- /dev/null +++ b/science_article_add/science_article_add/middlewares/wos.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# @Time : 2025/10/23 17:22 +# @Author : zhaoxiangpeng +# @File : wos.py +from __future__ import annotations +from typing import TYPE_CHECKING +import sys +import logging + +import redis +from scrapy.exceptions import CloseSpider +if TYPE_CHECKING: + from scrapy.crawler import Crawler + from scrapy import Request + +logger = logging.getLogger(__name__) + + +class WosLiteApiXkeyDownloaderMiddleware: + async def process_request(self, request, spider): + key_param = { + 'X-ApiKey': '941a216f25cbef0f80ee4ba58a08ef1e19dee7a4' + } + if not request.headers: + request.headers = key_param + return request + + request.headers.update(key_param) + return request + + +class WosStarterApiXkeyDownloaderMiddleware: + async def process_request(self, request, spider): + key_param = { + 'X-ApiKey': '53b8164e7543ccebe489988287e8b871bc2c0880' + } + request.headers.update(key_param) + # return request + + +class WosSidParamMiddleware: + + def __init__(self, redis_uri: str): + self.redis_cli = redis.from_url(redis_uri) + self.cookiepool_key = 'cookies_pool:wos:sid-sjtu' + + @classmethod + def from_crawler(cls, crawler: Crawler, *args, **kwargs): + settings = crawler.settings + return cls( + redis_uri=settings.get("REDIS_URL") + ) + + def process_request(self, request: Request, spider): + has_wos_sid = hasattr(request, 'wos_sid') + if not has_wos_sid: + sid = self.get_sid_from_redis() + if not sid: + raise CloseSpider(f"没有获取导sid: ") + # 把获取到的wos_sid绑定到request,可以在parse方法中获取到wos_sid的值 + setattr(request, 'wos_sid', sid) + else: + sid = getattr(request, 'wos_sid') + cookie_1 = {'dotmatics.elementalKey': 'SLsLWlMhrHnTjDerSrlG'} + + headers = { + 'authority': 'webofscience.clarivate.cn', + 'accept-language': 'zh-CN,zh;q=0.9', + 'cache-control': 'no-cache', + 'origin': 'https://webofscience.clarivate.cn', + 'pragma': 'no-cache', + # 'referer': 'https://webofscience.clarivate.cn/wos/woscc/advanced-search', + } + request.cookies = cookie_1 + + if request.url.endswith('runQuerySearch'): + # 检索时需要带有sid参数 + request._set_url(request.url + "?SID=%s" % sid) + headers.update( + {'accept': 'application/x-ndjson', 'content-type': 'text/plain;charset=UTF-8'}) + else: + headers.update( + {'accept': 'application/json, text/plain, */*', 'content-type': 'application/json', + 'x-1p-wos-sid': sid}) + for hk, hv in headers.items(): + request.headers[hk] = hv + + return None + + def get_sid_from_redis(self): + sid = self.redis_cli.get(self.cookiepool_key) + if not sid: + return None + logger.warning("没有可用cookie, 退出!!!") + sys.exit() + return sid.decode() + diff --git a/science_article_add/science_article_add/pipelines/mongo.py b/science_article_add/science_article_add/pipelines/mongo.py index d313b56..4feb5e0 100644 --- a/science_article_add/science_article_add/pipelines/mongo.py +++ b/science_article_add/science_article_add/pipelines/mongo.py @@ -5,7 +5,7 @@ from __future__ import annotations import logging from datetime import datetime -from typing import TYPE_CHECKING, Tuple, Generator +from typing import TYPE_CHECKING, Tuple, Union from pymongo import MongoClient from itemadapter import ItemAdapter @@ -15,7 +15,8 @@ from pymongo.errors import ( ) from science_article_add.db_utils.buffer_component import SimpleBuffer -from science_article_add.db_utils.mongo import MongoDBUtils, update_document +from science_article_add.db_utils.mongo import MongoDBUtils, update_document,build_update_query + if TYPE_CHECKING: from scrapy.crawler import Crawler from scrapy.statscollectors import StatsCollector @@ -51,14 +52,17 @@ class MongoPipeline(MongoDBUtils): d = adapter.asdict() try: insert_result = collection.insert_one(d) + self.stats.inc_value("item2db_inserted/{}".format(item_type)) except DuplicateKeyError as duplicate_error: if self.insert_failure_update_enable: write_error = duplicate_error.details key_pattern = write_error.get('keyPattern') key_value = write_error.get('keyValue') logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value) + d.pop("_id", None) [d.pop(k, None) for k in key_pattern.keys()] up_result = collection.update_one(filter=key_value, update={"$set": d}, upsert=True) + self.stats.inc_value("item2db_updated/{}".format(item_type)) except Exception: raise @@ -71,20 +75,22 @@ class MongoPipeline(MongoDBUtils): def _get_item_type(item) -> str: """获取Item类型""" if hasattr(item, '__tablename__'): - return item.item_type + return item.__class__.__tablename__ return 'items_null_table' class MongoPipelineMulti(MongoDBUtils): - def __init__(self, mongo_uri, mongo_db, buffer_max_size=None): + def __init__(self, mongo_uri, mongo_db, stats: StatsCollector, buffer_max_size=None): super().__init__(mongo_uri, mongo_db) self.buffer = SimpleBuffer(buffer_max_size=buffer_max_size, flush_interval=10) + self.stats: StatsCollector = stats @classmethod def from_crawler(cls, crawler): return cls( mongo_uri=crawler.settings.get("MONGO_URI"), mongo_db=crawler.settings.get("MONGO_DATABASE", "items"), + stats=crawler.stats, buffer_max_size=crawler.settings.get("BUFFER_MAX_SIZE", 100), ) @@ -127,11 +133,15 @@ class MongoPipelineMulti(MongoDBUtils): write_errors = bulk_write_e.details.get('writeErrors') current_time = datetime.now() up_time_requests = [] - errors = self._build__update(write_errors) collection = self.db.get_collection(item_type) - for new_item in errors: - filter_query, update_query = new_item - up_result = collection.update_one(filter=filter_query, update=update_query) + for write_error in write_errors: + filter_query, update_query = self._pick_filter_update(write_error) + original_doc = write_error.get('op') # 插入的数据 + task_ids = update_query.pop('task_ids', None) + if task_ids: + task_id_query = {'task_ids': task_ids} + collection.update_one(filter=filter_query, update=build_update_query(task_id_query, replace=False)) + up_result = collection.update_one(filter=filter_query, update=build_update_query(update_query, replace=False)) affect_count -= 1 if up_result.matched_count == up_result.modified_count == 1: @@ -149,16 +159,29 @@ class MongoPipelineMulti(MongoDBUtils): finally: # 清空缓冲区 self.buffer.clear_buffer(item_type) + self.stats.inc_value("item2db_inserted/{}".format(item_type), count=affect_count) + self.stats.inc_value("item2db_updated/{}".format(item_type), count=item_count - affect_count) logger.info('✅ 入库 %s 行数 %s 条, 新增 %s 条, 更新 %s 条' % ( - item_type, item_count, affect_count, item_count - affect_count)) - - def _build__update(self, write_errors) -> Generator[Tuple[dict, dict], Tuple[None, None]]: - for write_error in write_errors: - update_one = None, None - if write_error.get('code') == 11000: - update_one = self._build_dup_error(write_error) - if update_one: - yield update_one + item_type, item_count, affect_count, item_count - affect_count)) + + def _build__update(self, write_error) -> Union[Tuple[dict, dict], Tuple[None, None]]: + update_one = None, None + if write_error.get('code') == 11000: + update_one = self._pick_filter_update(write_error) + return update_one + + @staticmethod + def _pick_filter_update(write_error): + original_doc = write_error.get('op') # 插入的数据 + key_pattern = write_error.get('keyPattern') + original_doc.pop("_id", None) # 删掉插入失败产生的_id + filter_query = {} + update_query = {key: val for key, val in original_doc.items() if val} + update_query.pop('updated_at', None) # 删除不确定因素时间防止影响更新的 + + for key in key_pattern.keys(): + filter_query.update({key: update_query.pop(key, None)}) + return filter_query, update_query @staticmethod def _build_dup_error(write_error) -> tuple[None, None] | tuple[dict, dict]: diff --git a/science_article_add/science_article_add/pipelines/verify_data.py b/science_article_add/science_article_add/pipelines/verify_data.py index b8cfdd6..dad555c 100644 --- a/science_article_add/science_article_add/pipelines/verify_data.py +++ b/science_article_add/science_article_add/pipelines/verify_data.py @@ -65,3 +65,16 @@ class VerifyDataIntegrity: coll.update_many(filter={"third_id": {"$in": list(failure)}}, update={"$set": {"state": -1}}) else: self.logger.info("Successfully verified: %s" % "下载完整无异常") + + def spider_end(self): + """ + 组合检索式,把结果写到数据库里 + """ + dict( + content="", + qeury_id="", + records_found=0, + perfact=1, + state=1, + reason="" + ) diff --git a/science_article_add/science_article_add/pipelines/wos.py b/science_article_add/science_article_add/pipelines/wos.py index c72c670..67daab9 100644 --- a/science_article_add/science_article_add/pipelines/wos.py +++ b/science_article_add/science_article_add/pipelines/wos.py @@ -1,7 +1,11 @@ # pipelines.py +import logging import pymongo from itemadapter import ItemAdapter -from science_article_add.items.wos import WosCitedNumberItem, WosIdRelationItem + +from science_article_add.items.wos import WosArticleItem, WosCitedNumberItem, WosIdRelationItem +from science_article_add.pipelines.verify_data import VerifyDataIntegrity +logger = logging.getLogger(__name__) class MongoDBPipeline: @@ -38,3 +42,12 @@ class MongoDBPipeline: self.db[collection_name].insert_one(dict(adapter)) return item + + +class WosVerifyDataIntegrity(VerifyDataIntegrity): + + def open_spider(self, spider): + spider_batch_ids = spider.get_batch_ids() + for batch in spider_batch_ids: + if batch.get("field") == "UT": + self.batch_ids.add(batch.get("third_id")) diff --git a/science_article_add/science_article_add/scripts/get_db_task.py b/science_article_add/science_article_add/scripts/get_db_task.py index a613c1f..40f0ed8 100644 --- a/science_article_add/science_article_add/scripts/get_db_task.py +++ b/science_article_add/science_article_add/scripts/get_db_task.py @@ -22,10 +22,12 @@ class TaskManager: def get_task_from_mysql(self): cursor = self.client.cursor() record_fields = ['id', 'batch_date', 'query_id', 'task_condition', 'is_done'] - sql = "select %(fields)s from task_batch_record" % {'fields': ', '.join(record_fields)} + sql = "select %(fields)s from task_batch_record where is_done=0" % {'fields': ', '.join(record_fields)} try: cursor.execute(sql) result = cursor.fetchone() + if result is None: + return task_record_dic = dict(zip(record_fields, result)) fill = dict(zip(STRATEGY_FIELDS, STRATEGY_FIELDS)) fill.update(q_id=task_record_dic.get("query_id")) @@ -43,6 +45,34 @@ class TaskManager: finally: cursor.close() + def create_task_from_mysql(self, school_name=None, school_id=None): + cursor = self.client.cursor() + sql = """ + SELECT + r.org_id, + q.id, + q.content, + q.param, + q.interval_unit, + q.disable_flag, + q.state +FROM + relation_org_query AS r + JOIN task_search_strategy AS q ON r.query_id = q.id +WHERE + r.org_name="%(school_name)s" + AND source_type = 1 + AND disable_flag = 0""" % {'school_name': school_name} + try: + cursor.execute(sql) + result = cursor.fetchone() + + sql = "insert into %s (batch_date, query_id, task_condition, result_count, is_done, created_time) values ('%s', %s, '%s', %s, %s, CURRENT_TIMESTAMP)" % ( + "", batch_date, query_id, task_condition, result_count, is_done + ) + except Exception as exc: + pass + if __name__ == '__main__': tm = TaskManager() diff --git a/science_article_add/science_article_add/scripts/todo_id_manager.py b/science_article_add/science_article_add/scripts/todo_id_manager.py new file mode 100644 index 0000000..e69de29 diff --git a/science_article_add/science_article_add/spiders/download_by_qid.py b/science_article_add/science_article_add/spiders/download_by_qid.py new file mode 100644 index 0000000..f951d3a --- /dev/null +++ b/science_article_add/science_article_add/spiders/download_by_qid.py @@ -0,0 +1,95 @@ +from typing import Any, List, Union +from datetime import datetime +import scrapy +from scrapy.http import Response +from scrapy.http.request.json_request import JsonRequest +from scrapy.crawler import Crawler + +from science_article_add.items.wos import WosArticleItem, WosCitedNumberItem, WosIdRelationItem +from science_article_add.scripts.wos_parse_data import parse_full_records +from science_article_add.models import wos_model as model +from science_article_add.utils import tools +from science_article_add.configs import wos as config + + +def maybe_list(val: Union[int, List[int]]) -> List[int]: + if isinstance(val, int): + return [val] + return list(val) + + +class DownloadByQidSpider(scrapy.Spider): + name = "download_by_qid" + + custom_settings = dict( + DOWNLOADER_MIDDLEWARES={ + "science_article_add.middlewares.wos.WosSidParamMiddleware": 500 + }, + ITEM_PIPELINES={ + "science_article_add.pipelines.mongo.MongoPipeline": 300, + }, + LOG_LEVEL="INFO" + ) + + @classmethod + def from_crawler(cls, crawler, *args, **kwargs): + return super().from_crawler(crawler, *args, **kwargs) + + def __init__(self, record_id: str, mark_from: int = 1, mark_to: int = 500, records_found: int = None, **kwargs): + super().__init__() + self.record_id = record_id + self.records_found = records_found + self.mark_from = mark_from + self.mark_to = mark_to + self.task_id = None + self.org_id = None + self.query_id = None + self.bind_relation_enable = False + self.bind_relation_d = None + if self.bind_relation_enable: + self.build_relation() + + def build_relation(self): + bind_relation_d = dict() + if self.task_id: self.bind_relation_d.setdefault("task_ids", maybe_list(self.task_id)) + if self.org_id: self.bind_relation_d.setdefault("school_ids", maybe_list(self.org_id)) + if self.query_id: self.bind_relation_d.setdefault("query_ids", maybe_list(self.query_id)) + self.bind_relation_d = bind_relation_d + return bind_relation_d + + async def start(self): + query_id = self.record_id + records_found = self.records_found + mark_start = self.mark_from + mark_end = self.mark_to + yield JsonRequest(config.WOS_EXPORT_FILE_API, method='POST', + data=model.export_search_data_to_txt(query_id, mark_from=mark_start, + mark_to=mark_end), + callback=self.download_parse) + + def download_parse(self, response: Response, **kwargs: Any) -> Any: + parse_count = 0 + batch_time = datetime.now() + records = parse_full_records(response.body) + for data_dic in records: + t_id = data_dic.pop('ut', None) + if t_id: + parse_count += 1 + article_item = WosArticleItem() + article_item['third_id'] = t_id + article_item['exported'] = data_dic + article_item['updated_at'] = batch_time + yield article_item + # 解析被引量 + if cited_num := tools.str2int(data_dic.get("tc", 0), 0): + cited_item = WosCitedNumberItem() + cited_item['third_id'] = t_id + cited_item['cited'] = cited_num + cited_item['updated_at'] = batch_time + yield cited_item + if self.bind_relation_enable and self.bind_relation_d: + # 当启用绑定关系配置才会绑定各种关系 + relation_item = WosIdRelationItem() + relation_item['third_id'] = t_id + relation_item.update(**self.bind_relation_d) + yield relation_item diff --git a/science_article_add/science_article_add/spiders/wos_latest_increment.py b/science_article_add/science_article_add/spiders/wos_latest_increment.py index ca2e969..8c731e8 100644 --- a/science_article_add/science_article_add/spiders/wos_latest_increment.py +++ b/science_article_add/science_article_add/spiders/wos_latest_increment.py @@ -28,6 +28,10 @@ class WosLatestIncrementSpider(scrapy.Spider): "science_article_add.pipelines.mongo.MongoPipelineMulti": 300, "science_article_add.pipelines.duptodo.DupTodoPipeline": 400, }, + EXTENSIONS={ + "science_article_add.extensions.ackextension.ACKExtension": 0, + # "science_article_add.extensions.dingtalk_extension.DingTalkExtension": 0, + }, LOG_LEVEL="INFO" ) source = "wos" diff --git a/science_article_add/science_article_add/utils/dingtalk.py b/science_article_add/science_article_add/utils/dingtalk.py new file mode 100644 index 0000000..f4844c3 --- /dev/null +++ b/science_article_add/science_article_add/utils/dingtalk.py @@ -0,0 +1,378 @@ +import asyncio +import aiohttp +from typing import Dict, List, Any, Optional +from enum import Enum +import logging +from dataclasses import dataclass +import time + +logger = logging.getLogger(__name__) + + +class DingTalkMessageType(Enum): + """钉钉消息类型枚举""" + TEXT = "text" + LINK = "link" + MARKDOWN = "markdown" + ACTION_CARD = "actionCard" + FEED_CARD = "feedCard" + + +@dataclass +class DingTalkConfig: + """钉钉配置数据类""" + webhook: str + secret: Optional[str] = None + at_mobiles: Optional[List[str]] = None + at_user_ids: Optional[List[str]] = None + at_all: bool = False + + +class DingTalkSender: + """ + 钉钉消息推送器 + + 功能描述: + 1. 支持多种消息类型:文本、链接、Markdown、ActionCard、FeedCard + 2. 支持@指定用户或@所有人 + 3. 支持签名安全设置 + 4. 支持异步发送和批量发送 + 5. 内置重试机制和错误处理 + """ + + def __init__(self, config: DingTalkConfig): + """ + 初始化钉钉消息发送器 + + Args: + config: 钉钉机器人配置 + """ + self.config = config + self.session: Optional[aiohttp.ClientSession] = None + self._retry_count = 3 + self._retry_delay = 1 + + async def __aenter__(self): + """异步上下文管理器入口""" + await self._ensure_session() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """异步上下文管理器出口""" + await self.close() + + async def _ensure_session(self): + """确保会话存在""" + if self.session is None: + self.session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=10) + ) + + async def close(self): + """关闭会话""" + if self.session: + await self.session.close() + self.session = None + + def _generate_signature(self, timestamp: int) -> str: + """ + 生成签名 + + Args: + timestamp: 时间戳 + + Returns: + 签名字符串 + """ + if not self.config.secret: + return "" + + import hmac + import hashlib + import base64 + import urllib.parse + + string_to_sign = f"{timestamp}\n{self.config.secret}" + hmac_code = hmac.new( + self.config.secret.encode('utf-8'), + string_to_sign.encode('utf-8'), + digestmod=hashlib.sha256 + ).digest() + + sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) + return sign + + def _build_webhook_url(self) -> str: + """ + 构建完整的webhook URL(包含签名) + + Returns: + 完整的webhook URL + """ + if not self.config.secret: + return self.config.webhook + + timestamp = int(time.time() * 1000) + sign = self._generate_signature(timestamp) + return f"{self.config.webhook}×tamp={timestamp}&sign={sign}" + + def _build_at_info(self) -> Dict[str, Any]: + """ + 构建@信息 + + Returns: + @信息字典 + """ + at_info = {} + if self.config.at_mobiles: + at_info["atMobiles"] = self.config.at_mobiles + if self.config.at_user_ids: + at_info["atUserIds"] = self.config.at_user_ids + if self.config.at_all: + at_info["isAtAll"] = True + + return at_info + + async def _send_request(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + 发送请求到钉钉 + + Args: + data: 请求数据 + + Returns: + 响应数据 + + Raises: + Exception: 发送失败时抛出异常 + """ + await self._ensure_session() + + webhook_url = self._build_webhook_url() + headers = { + "Content-Type": "application/json", + "User-Agent": "DingTalk-Bot/1.0" + } + + last_exception = None + for attempt in range(self._retry_count): + try: + logger.info(f"发送钉钉消息,尝试 {attempt + 1}/{self._retry_count}") + + async with self.session.post( + webhook_url, + json=data, + headers=headers + ) as response: + result = await response.json() + + if response.status == 200 and result.get("errcode") == 0: + logger.info("钉钉消息发送成功") + return result + else: + error_msg = f"钉钉消息发送失败: {result.get('errmsg', 'Unknown error')}" + logger.error(error_msg) + last_exception = Exception(error_msg) + + except asyncio.TimeoutError: + error_msg = f"钉钉消息发送超时,尝试 {attempt + 1}/{self._retry_count}" + logger.warning(error_msg) + last_exception = Exception(error_msg) + except Exception as e: + error_msg = f"钉钉消息发送异常: {str(e)},尝试 {attempt + 1}/{self._retry_count}" + logger.error(error_msg) + last_exception = e + + # 如果不是最后一次尝试,等待重试 + if attempt < self._retry_count - 1: + await asyncio.sleep(self._retry_delay * (attempt + 1)) + + # 所有重试都失败,抛出异常 + raise last_exception or Exception("钉钉消息发送失败") + + async def send_text(self, content: str, at_mobiles: Optional[List[str]] = None, + at_user_ids: Optional[List[str]] = None, at_all: Optional[bool] = None) -> Dict[str, Any]: + """ + 发送文本消息 + + Args: + content: 消息内容 + at_mobiles: @的手机号列表 + at_user_ids: @的用户ID列表 + at_all: 是否@所有人 + + Returns: + 发送结果 + """ + at_info = self._build_at_info() + # 覆盖默认的@设置 + if at_mobiles is not None: + at_info["atMobiles"] = at_mobiles + if at_user_ids is not None: + at_info["atUserIds"] = at_user_ids + if at_all is not None: + at_info["isAtAll"] = at_all + + data = { + "msgtype": DingTalkMessageType.TEXT.value, + "text": { + "content": content + }, + "at": at_info + } + + return await self._send_request(data) + + async def send_markdown(self, title: str, text: str, at_mobiles: Optional[List[str]] = None, + at_user_ids: Optional[List[str]] = None, at_all: Optional[bool] = None) -> Dict[str, Any]: + """ + 发送Markdown消息 + + Args: + title: 消息标题 + text: Markdown格式的消息内容 + at_mobiles: @的手机号列表 + at_user_ids: @的用户ID列表 + at_all: 是否@所有人 + + Returns: + 发送结果 + """ + at_info = self._build_at_info() + if at_mobiles is not None: + at_info["atMobiles"] = at_mobiles + if at_user_ids is not None: + at_info["atUserIds"] = at_user_ids + if at_all is not None: + at_info["isAtAll"] = at_all + + data = { + "msgtype": DingTalkMessageType.MARKDOWN.value, + "markdown": { + "title": title, + "text": text + }, + "at": at_info + } + + return await self._send_request(data) + + async def send_link(self, title: str, text: str, message_url: str, + pic_url: Optional[str] = None) -> Dict[str, Any]: + """ + 发送链接消息 + + Args: + title: 消息标题 + text: 消息内容 + message_url: 点击消息跳转的URL + pic_url: 图片URL + + Returns: + 发送结果 + """ + data = { + "msgtype": DingTalkMessageType.LINK.value, + "link": { + "title": title, + "text": text, + "messageUrl": message_url, + } + } + + if pic_url: + data["link"]["picUrl"] = pic_url + + return await self._send_request(data) + + async def send_action_card(self, title: str, text: str, single_title: str, + single_url: str, btn_orientation: str = "0") -> Dict[str, Any]: + """ + 发送整体跳转ActionCard消息 + + Args: + title: 消息标题 + text: 消息内容 + single_title: 单个按钮标题 + single_url: 单个按钮跳转URL + btn_orientation: 按钮排列方向,0-竖直,1-横向 + + Returns: + 发送结果 + """ + data = { + "msgtype": DingTalkMessageType.ACTION_CARD.value, + "actionCard": { + "title": title, + "text": text, + "singleTitle": single_title, + "singleURL": single_url, + "btnOrientation": btn_orientation + } + } + + return await self._send_request(data) + + async def send_feed_card(self, links: List[Dict[str, str]]) -> Dict[str, Any]: + """ + 发送FeedCard消息 + + Args: + links: 链接列表,每个链接包含title, messageURL, picURL + + Returns: + 发送结果 + """ + data = { + "msgtype": DingTalkMessageType.FEED_CARD.value, + "feedCard": { + "links": links + } + } + + return await self._send_request(data) + + async def send_alert(self, title: str, message: str, level: str = "info", + at_users: bool = False) -> Dict[str, Any]: + """ + 发送告警消息(便捷方法) + + Args: + title: 告警标题 + message: 告警内容 + level: 告警级别 (info, warning, error, critical) + at_users: 是否@相关人员 + + Returns: + 发送结果 + """ + level_emojis = { + "info": "ℹ️", + "warning": "⚠️", + "error": "❌", + "critical": "🚨" + } + + emoji = level_emojis.get(level, "ℹ️") + + markdown_text = f""" +## {emoji} {title} + +**级别**: {level.upper()} +**时间**: {time.strftime('%Y-%m-%d %H:%M:%S')} + +**详情**: +{message} + """.strip() + + at_all = at_users and self.config.at_all + at_mobiles = self.config.at_mobiles if at_users else None + at_user_ids = self.config.at_user_ids if at_users else None + + return await self.send_markdown( + title=f"{emoji} {title}", + text=markdown_text, + at_mobiles=at_mobiles, + at_user_ids=at_user_ids, + at_all=at_all + ) diff --git a/science_article_add/science_article_add/utils/get_cookie.py b/science_article_add/science_article_add/utils/get_cookie.py new file mode 100644 index 0000000..e69de29 diff --git a/science_article_add/science_article_add/utils/get_self_ip.py b/science_article_add/science_article_add/utils/get_self_ip.py new file mode 100644 index 0000000..e69de29