zhaoxiangpeng 3 weeks ago
parent e48c1cc704
commit 12e9ed53a9

@ -0,0 +1,3 @@
sqlalchemy~=1.3.24
scrapy~=2.13.3
itemadapter~=0.11.0

@ -0,0 +1,13 @@
# my_scrapy_project/models/base.py
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, DateTime
from datetime import datetime
Base = declarative_base()
class BaseModel(Base):
"""基础模型类"""
__abstract__ = True
id = Column(Integer, primary_key=True, autoincrement=True)

@ -0,0 +1,225 @@
# Define here the models for your spider middleware
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
from __future__ import annotations
import json
from typing import TYPE_CHECKING
import redis
from scrapy import signals, Spider
from scrapy.exceptions import CloseSpider
# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
if TYPE_CHECKING:
from scrapy.crawler import Crawler
from scrapy import Request
class ScienceArticleWosSpiderMiddleware:
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the spider middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_spider_input(self, response, spider):
# Called for each response that goes through the spider
# middleware and into the spider.
# Should return None or raise an exception.
return None
def process_spider_output(self, response, result, spider):
# Called with the results returned from the Spider, after
# it has processed the response.
# Must return an iterable of Request, or item objects.
for i in result:
yield i
def process_spider_exception(self, response, exception, spider):
# Called when a spider or process_spider_input() method
# (from other spider middleware) raises an exception.
# Should return either None or an iterable of Request or item objects.
pass
async def process_start(self, start):
# Called with an async iterator over the spider start() method or the
# maching method of an earlier spider middleware.
async for item_or_request in start:
yield item_or_request
def spider_opened(self, spider):
spider.logger.info("Spider opened: %s" % spider.name)
class ScienceArticleWosDownloaderMiddleware:
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the downloader middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_request(self, request, spider):
# Called for each request that goes through the downloader
# middleware.
# Must either:
# - return None: continue processing this request
# - or return a Response object
# - or return a Request object
# - or raise IgnoreRequest: process_exception() methods of
# installed downloader middleware will be called
return None
def process_response(self, request, response, spider):
# Called with the response returned from the downloader.
# Must either;
# - return a Response object
# - return a Request object
# - or raise IgnoreRequest
return response
def process_exception(self, request, exception, spider):
# Called when a download handler or a process_request()
# (from other downloader middleware) raises an exception.
# Must either:
# - return None: continue processing this exception
# - return a Response object: stops process_exception() chain
# - return a Request object: stops process_exception() chain
pass
def spider_opened(self, spider):
spider.logger.info("Spider opened: %s" % spider.name)
class WosCookieMiddleware:
def __init__(self, redis_uri: str):
self.redis_cli = redis.from_url(redis_uri, decode_responses=True)
self.redis_key_prefix = 'cookie_pool:wos_sid'
self.cookiepool_cache_key = 'cookie_pool:wos:sid_q'
@classmethod
def from_crawler(cls, crawler: Crawler, *args, **kwargs):
settings = crawler.settings
middle = cls(
redis_uri=settings.get("REDIS_URL")
)
crawler.signals.connect(middle.open_spider, signal=signals.spider_opened)
crawler.signals.connect(middle.close_spider, signal=signals.spider_closed)
return middle
def open_spider(self, spider: Spider):
self.loading_sid_from_redis()
def close_spider(self, spider: Spider, reason: str = None):
self.del_sid_from_redis()
def process_request(self, request: Request, spider):
req_wos_sid = request.meta.get('wos_sid')
if not req_wos_sid:
sid = self.get_sid_from_redis()
if not sid:
raise CloseSpider(f"没有获取到sid即将退出")
# 把获取到的wos_sid绑定到request可以在parse方法中获取到wos_sid的值
request.meta['wos_sid'] = sid
else:
sid = req_wos_sid
cookie_1 = {'dotmatics.elementalKey': 'SLsLWlMhrHnTjDerSrlG'}
headers = {
'authority': 'webofscience.clarivate.cn',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'origin': 'https://webofscience.clarivate.cn',
'pragma': 'no-cache',
# 'referer': 'https://webofscience.clarivate.cn/wos/woscc/advanced-search',
}
request.cookies = cookie_1
if request.url.endswith('runQuerySearch'):
# 检索时需要带有sid参数
request._set_url(request.url + "?SID=%s" % sid)
headers.update(
{'accept': 'application/x-ndjson', 'content-type': 'text/plain;charset=UTF-8'})
else:
headers.update(
{'accept': 'application/json, text/plain, */*', 'content-type': 'application/json',
'x-1p-wos-sid': sid})
for hk, hv in headers.items():
request.headers[hk] = hv
return None
def process_response(self, request, response, spider):
if response.status != 200:
self.mark_sid_status(request.meta.get('wos_sid'))
return response
def get_sid_from_redis(self):
val = self.redis_cli.rpoplpush(self.cookiepool_cache_key, self.cookiepool_cache_key)
if val:
self.redis_cli.hincrby(f'{self.redis_key_prefix}:{val}', 'used_times', 1)
return val
return None
def mark_sid_status(self, sid: str, status: str = 'validate'):
"""
:param sid:
:param status: validate/expired
:return:
"""
if status == "expired":
# 过期直接删除key
self.redis_cli.delete(f'{self.cookiepool_cache_key}:{sid}')
else:
self.redis_cli.hset(f'{self.redis_key_prefix}:{sid}', 'status', status)
def loading_sid_from_redis(self) -> list:
"""
加载所有的sid到List结构从缓存队列取sid
:return:
"""
valid_sid = []
keys = self.redis_cli.keys(f'{self.redis_key_prefix}:*')
for key in keys:
# 获取所有的信息
key_obj: dict = self.redis_cli.hgetall(key)
if key_obj.get("status") == "normal":
real_sid = key.rsplit(':', maxsplit=1)[-1]
valid_sid.append(real_sid)
self.redis_cli.lpush(self.cookiepool_cache_key, real_sid)
return valid_sid
def del_sid_from_redis(self):
self.redis_cli.delete(f'{self.cookiepool_cache_key}')
class A:
def __init__(self, redis_cli):
self.redis_cli = redis_cli
def load_keys(self, name):
return self.redis_cli.keys(r'cookie_pool:wos_sid:*')
def get_one_sid(self, name):
return self.redis_cli.rpoplpush(name)

@ -0,0 +1,409 @@
# -*- coding: utf-8 -*-
# @Time : 2025/12/16 15:24
# @Author : zhaoxiangpeng
# @File : cookie_manager.py
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Generator
import logging
import time
import threading
from datetime import datetime, timedelta
from typing import Optional, Callable
import redis
import requests
from DrissionPage import Chromium
from science_article_wos.utils.xpath_cfg import Settings
if TYPE_CHECKING:
from DrissionPage import ChromiumPage, ChromiumOptions
from scrapy_drissionpage.response import DrissionResponse
from DrissionPage._pages.chromium_tab import ChromiumTab
from DrissionPage._units.listener import DataPacket, Response
VERIFY_ROUTER = "/api/wosnx/core/verify"
settings = Settings()
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def get_self_ip():
"""获取当前IP地址"""
try:
resp = requests.get("https://www.httpbin.org/ip", timeout=10)
assert resp.status_code == 200
data = resp.json()
ipaddr = data['origin']
return ipaddr
except Exception as e:
logger.error(f"获取IP失败: {str(e)}")
return "unknown"
def intercept(self, listen, operation, callback, tab=None):
listen()
operation()
for packet in tab.listen.steps(count=3):
if not intercept_verify(packet):
continue
r = callback(packet)
if isinstance(r, Generator):
return r
else:
if isinstance(r, bool):
break
return
def intercept_verify(packet: DataPacket):
content = packet.response.body
if isinstance(content, bytes) and content.find(b'"Server.passiveVerificationRequired"') != -1:
return False
else:
return True
class DPOperations:
def __init__(self, browser, tab):
self.browser = browser
self.tab = tab
@staticmethod
def operate_cookie_first(tab):
# 处理弹出的cookie首选项
logger.debug('Operating cookie first...')
ck_m_div = tab.ele('xpath://*[@id="onetrust-banner-sdk"]')
if ck_m_div:
ele = tab.ele('xpath://*[@id="onetrust-accept-btn-handler"]')
ele.click()
@staticmethod
def change_db(tab):
logger.info('Changing database...')
default_db_ele = tab.ele('xpath://*[@id="snSelectDb"]/button')
c1 = default_db_ele.raw_text
default_db_ele.click()
xpath = '//*[@id="global-select"]/div/div[@aria-label="Select database"]/div[@title="Web of Science Core Collection"]'
tab.ele(
'xpath:%(xpath)s' % {"xpath": xpath}).click()
@staticmethod
def input_ops(tab, content=None, clear_input: bool = True):
logger.debug('Input operation...')
input_area_ele = tab.ele('xpath:%(xpath)s' % {"xpath": settings.QUERY_INPUT_ELE})
if clear_input:
input_area_ele.clear() # 清空
if content is None:
content = "(OG=(Shanghai Jiao Tong University)) AND PY=(2025)"
input_area_ele.input(content) # 输入检索内容
@staticmethod
def search_ops(tab):
logger.debug('Search operation...')
search_button_ele = tab.ele('xpath:%(xpath)s' % {"xpath": settings.SEARCH_BUTTON_ELE})
search_button_ele.click()
@staticmethod
def export_ops(tab, start: int = 1, end: int = 50):
tab.ele('xpath:%(xpath)s' % {"xpath": settings.EXPORT_BUTTON_ELE}).click() # 点击导出
tab.ele('xpath:%(xpath)s' % {"xpath": settings.TABWIN_BUTTON_ELE}).click() # 选择制表符分割
# 等待弹框
# 切换导出格式选择全记录与参考文献
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_TYPE_SELECT_ELE}).click()
tab.ele('xpath:%(xpath)s' % {"xpath": settings.FULL_RECORD_REFERENCE_ELE}).click()
# 输入记录起止
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_RANGE_ELE}).click() # 切换到范围
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_EXPORT_START_ELE}).input(start, clear=True)
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_EXPORT_END_ELE}).input(end, clear=True)
# 点击导出
tab.ele('xpath:%(xpath)s' % {"xpath": settings.EXPORT_FILE_ELE}).click.to_download(
# save_path=DOWNLOAD_PATH,
rename='%s.txt' % 'savedrecs'
)
def first_ops(self):
tab = self.tab
self.operate_cookie_first(tab)
self.change_db(tab)
self.input_ops(tab)
self.search_ops(tab)
def bypass_ops(self):
tab = self.tab
self.export_ops(tab)
class CookieManager:
def __init__(
self,
redis_uri: str = "redis://localhost:6379/0",
cookie_lifetime: int = 60 * 60 * 4, # cookie有效期
check_interval: int = 60,
keep_browser_alive: bool = True,
):
self.url = "https://webofscience.clarivate.cn/wos/woscc/advanced-search"
self.cookie_lifetime = cookie_lifetime
# Redis连接
self.redis_key_prefix = 'cookie_pool:wos_sid'
self.check_interval = check_interval
self.redis_client = redis.Redis.from_url(
redis_uri,
decode_responses=True
)
logger.info(f"Redis连接成功: {redis_uri}")
self.dp_ins: DPOperations = None
self.first = True
# 浏览器实例
self.browser = None
self.tab = None
self.keep_browser_alive = keep_browser_alive
self.current_sid = None
self.bypass_ok_tag = False
# 控制标志
self._running = False
self._monitor_thread = None
def start_browser(self):
"""启动浏览器"""
if self.browser is None:
logger.info("启动浏览器...")
self.browser = Chromium()
self.tab = self.browser.latest_tab
logger.info("浏览器启动成功")
self.dp_ins = DPOperations(self.browser, self.tab)
def close_browser(self):
"""关闭浏览器"""
if self.browser:
logger.info("关闭浏览器...")
self.browser.quit()
self.browser = None
self.tab = None
logger.info("浏览器已关闭")
def refresh_page(self):
try:
logger.info("正在刷新页面")
if self.tab:
self.tab.refresh()
except Exception as e:
logger.error(f"正在刷新页面: {str(e)}")
def intercept_verify(self, op_func: Callable[[], None]):
"""
所有的刷新或者xhr操作都要监听一下hcaptcha验证接口
:return:
"""
logger.debug("监听 %s" % VERIFY_ROUTER)
self.tab.listen.start(VERIFY_ROUTER, method="POST") # 开启监听
op_func() # 指定操作方法
verify_count = 0
for packet in self.tab.listen.steps(count=3, timeout=60):
verify_count += 1
if self.verify_hook(packet):
# 验证成功会退出,不会出发下面的逻辑
return
if verify_count:
logger.warning("获取失败")
else:
logger.info("没有触发验证, cookie有效")
self.sid2redis()
@staticmethod
def get_wos_sid_from_localstorage(tab):
s = tab.local_storage('wos_sid')
sid = s.strip('"')
return sid
def get_cookie_from_browser(self):
try:
if self.tab is None:
self.start_browser()
if self.first:
logger.info(f"第一次访问页面: {self.url}")
self.tab.get(self.url)
time.sleep(3) # 等待页面加载
# 执行自定义操作
self.intercept_verify(op_func=self.dp_ins.first_ops)
time.sleep(2)
self.sid2redis()
except Exception as e:
logger.error(e)
def save_cookie_to_redis(self, wos_sid: str):
try:
current_time = datetime.now()
expired_time = current_time + timedelta(seconds=self.cookie_lifetime)
ip = get_self_ip()
cookie_data = {
'ip': ip,
'status': 'normal',
'generated_time': current_time.isoformat(),
'expired_time': expired_time.isoformat(),
'used_times': 0
}
self.redis_client.hset(
name=f'{self.redis_key_prefix}:{wos_sid}',
mapping=cookie_data
)
logger.info(f"Cookie已保存到Redis: {self.redis_key_prefix}:{wos_sid}")
except Exception as e:
(
logger.error(f"保存cookie到Redis失败: {str(e)}"))
def sid2redis(self):
"""
存储到reids
:return:
"""
wos_sid = self.get_wos_sid_from_localstorage(self.tab)
if wos_sid:
logger.info("保存 %s 到redis..." % wos_sid)
self.current_sid = wos_sid
self.save_cookie_to_redis(wos_sid)
def verify_hook(self, packet: DataPacket):
verified_tag = 'verified'
request_url = packet.request.url
verify_success = False
if request_url.find(VERIFY_ROUTER) != -1: # 走验证了
logger.debug(f"正在验证: {request_url}\n"
f"请求body: {packet.request.postData}")
response_body = packet.response.body
if isinstance(response_body, bytes):
verify_success = packet.response.body.find(verified_tag.encode()) != -1
elif isinstance(response_body, str):
verify_success = packet.response.body.find(verified_tag) != -1
elif isinstance(response_body, dict):
verify_success = response_body.get('key') == verified_tag
elif isinstance(response_body, list) and len(response_body) > 0:
verify_success = response_body[0].get('key') == verified_tag
else:
raise TypeError("未知的response_body类型")
if verify_success:
logger.info(f"验证成功: {request_url}")
return True
else:
return False
else:
logger.info("无需验证")
return True
def check_cookie_status(self, sid: str = None, default_status: str = "expired"):
if sid is None:
sid = self.current_sid
if not sid:
return default_status
status = self.redis_client.hget(name=f'{self.redis_key_prefix}:{sid}', key='status')
return status
def monitor_loop(self):
"""
监控循环定期检查cookie状态
Args:
custom_operations: 自定义操作函数
"""
logger.info(f"开始监控cookie检查间隔: {self.check_interval}")
while self._running:
try:
status = self.check_cookie_status()
if status == "validate":
logger.warning("cookie使用次数超限/需要验证,准备进行验证。。。")
# 验证逻辑,导出一次过验证
self.intercept_verify(op_func=self.dp_ins.bypass_ops)
elif status == "expired":
logger.warning("cookie已过期准备重新获取。。。")
# 刷新页面或者重新进行搜索/导出
self.intercept_verify(op_func=self.refresh_page)
else:
logger.info(f"Cookie状态正常: {status}")
# 等待下次检查
time.sleep(self.check_interval)
except Exception as e:
logger.error(e)
def start_monitor(self):
if self._running:
logger.warning("监控已在运行中")
return
if self.browser is None:
self.start_browser()
# 首次获取cookie
logger.info("首次获取cookie...")
self.get_cookie_from_browser()
if self.current_sid:
logger.error("首次获取cookie成功")
else:
logger.error("首次获取cookie失败")
if not self.keep_browser_alive:
self.close_browser()
return
# 如果不需要浏览器保活,关闭
if not self.keep_browser_alive:
self.close_browser()
# 启动监控线程
self._running = True
# self._monitor_thread = threading.Thread(
# target=self.monitor_loop,
# name="CookieMonitorThread",
# daemon=True
# )
# self._monitor_thread.start()
self.monitor_loop()
logger.info("监控已启动")
def stop_monitor(self):
"""停止监控"""
if not self._running:
logger.warning("监控未在运行")
return
logger.info("正在停止监控...")
self._running = False
if self._monitor_thread:
self._monitor_thread.join(timeout=5)
self.close_browser()
logger.info("监控已停止")
def main():
manager = CookieManager(redis_uri="redis://:kcidea1509@192.168.1.211:6379/10", keep_browser_alive=True)
try:
manager.start_monitor()
# 主程序运行
logger.info("Cookie管理器正在运行按Ctrl+C停止...")
except KeyboardInterrupt:
logger.info("收到停止信号")
# manager.close_browser()
finally:
manager.stop_monitor()
if __name__ == '__main__':
main()

@ -0,0 +1,4 @@
# This package will contain the spiders of your Scrapy project
#
# Please refer to the documentation for information on how to create and manage
# your spiders.

@ -0,0 +1,110 @@
from typing import Any, List, Union
from datetime import datetime
import scrapy
from scrapy import signals
from scrapy.http import Response
from scrapy.http.request.json_request import JsonRequest
from .database import DatabaseSpider
from science_article_wos.items import WosArticleItem, WosCitedNumberItem, WosIdRelationItem
from science_article_wos.scripts.wos_parse_data import parse_full_records
from science_article_wos.utils import model
from science_article_wos.utils import tools
from science_article_wos.utils import config
def maybe_list(val: Union[int, List[int]]) -> List[int]:
if isinstance(val, int):
return [val]
return list(val)
class DownloadBySearchRecordSpider(DatabaseSpider):
name = "download_by_search_record"
custom_settings = dict(
DOWNLOADER_MIDDLEWARES={
"science_article_wos.middlewares.WosCookieMiddleware": 500
},
# ITEM_PIPELINES={
# "science_article_wos.pipelines.MongoPipeline": 300,
# },
REDIS_URL='redis://:kcidea1509@192.168.1.211:6379/10',
LOG_LEVEL="INFO"
)
def spider_opened(self, spider):
if self.record_id is None:
# 从数据库中查询任务执行
from science_article_wos.dao.database.connection import DatabaseManager
from science_article_wos.dao.models.search_record import SearchRecord
db_url = ""
db_manager = DatabaseManager(db_url)
with db_manager.session_scope() as session:
record = session.query(SearchRecord).filter_by(state="pending").first()
if record:
print(f"查询到记录: {record}")
self.record_id = record.record_id
self.records_found = record.records_found
self.mark_from = record.mark_from
self.mark_to = record.mark_to
self.shard = record.shard
def __init__(self, record_id: str = None, mark_from: int = 1, mark_to: int = 500, shard: str | int = None, records_found: int = None, **kwargs):
super().__init__()
self.record_id = record_id
self.records_found = records_found
self.mark_from = mark_from
self.mark_to = mark_to
self.shard = shard
self.task_id = None
self.org_id = None
self.query_id = None
self.bind_relation_enable = False
self.bind_relation_d = None
if self.bind_relation_enable:
self.build_relation()
def build_relation(self):
bind_relation_d = dict()
if self.task_id: self.bind_relation_d.setdefault("task_ids", maybe_list(self.task_id))
if self.org_id: self.bind_relation_d.setdefault("school_ids", maybe_list(self.org_id))
if self.query_id: self.bind_relation_d.setdefault("query_ids", maybe_list(self.query_id))
self.bind_relation_d = bind_relation_d
return bind_relation_d
async def start(self):
query_id = self.record_id
records_found = self.records_found
mark_start = self.mark_from
mark_end = self.mark_to
yield JsonRequest(config.WOS_EXPORT_FILE_API, method='POST',
data=model.export_search_data_to_txt(query_id, mark_from=mark_start,
mark_to=mark_end),
callback=self.download_parse)
def download_parse(self, response: Response, **kwargs: Any) -> Any:
parse_count = 0
batch_time = datetime.now()
records = parse_full_records(response.body)
for data_dic in records:
t_id = data_dic.pop('ut', None)
if t_id:
parse_count += 1
article_item = WosArticleItem()
article_item['third_id'] = t_id
article_item['exported'] = data_dic
article_item['updated_at'] = batch_time
yield article_item
# 解析被引量
if cited_num := tools.str2int(data_dic.get("tc", 0), 0):
cited_item = WosCitedNumberItem()
cited_item['third_id'] = t_id
cited_item['cited'] = cited_num
cited_item['updated_at'] = batch_time
yield cited_item
if self.bind_relation_enable and self.bind_relation_d:
# 当启用绑定关系配置才会绑定各种关系
relation_item = WosIdRelationItem()
relation_item['third_id'] = t_id
relation_item.update(**self.bind_relation_d)
yield relation_item

@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
# @Time : 2024/1/16 8:41
# @Author : zhaoxiangpeng
# @File : config.py
from datetime import datetime
# 数据来源名
SOURCE_NAME = 'wos'
WOS_SEARCH_API = "https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch"
WOS_DETAIL_LINK = 'https://webofscience.clarivate.cn/wos/woscc/full-record/{wos_id}'
WOS_DETAIL_API = 'https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch'
WOS_ADVANCED_SEARCH_API = 'https://webofscience.clarivate.cn/api/wosnx/core/runQuerySearch'
WOS_EXPORT_FILE_API = 'https://webofscience.clarivate.cn/api/wosnx/indic/export/saveToFile'
WOS_RECORD_STREAM_API = "https://webofscience.clarivate.cn/api/wosnx/core/runQueryGetRecordsStream"
WOS_REFINE_API = "https://webofscience.clarivate.cn/api/wosnx/core/runQueryRefine"
# WOS starter api
WOS_STARTER_DOCUMENT_UID_API = "https://api.clarivate.com/apis/wos-starter/v1/documents/{uid}" # Unique Identifier/Accession Number
WOS_STARTER_DOCUMENT_API = "https://api.clarivate.com/apis/wos-starter/v1/documents"
WOS_STARTER_PER_PAGE_LIMIT = 50 # 每页限制的数量
# WOS lite api
WOS_LITE_QUERY_FIRST_API = 'https://wos-api.clarivate.com/api/woslite' # 第一个请求请求后会有一个query的序号
WOS_LITE_QUERY_API = 'https://wos-api.clarivate.com/api/woslite/query' # 使用序号进行翻页
# 发文表
WOS_ARTICLE_COLLECTION = 'data_{}_article'.format(SOURCE_NAME)
# 被引量集合
WOS_CITED_NUMBER_COLLECTION = "relation_cited_number_{}".format(SOURCE_NAME)
# 发文关系表
SCHOOL_RELATION_COLLECTION = 'relation_school_{}'.format(SOURCE_NAME)
# 参考文献集合
WOS_REFERENCE_COLLECTION = "relation_reference_{}".format(SOURCE_NAME)
# 待下载Id表
ARTICLE_TODO_IDS_COLLECTION = "todo_ids_{}".format(SOURCE_NAME)
# CSCD来源的发文表
WOS_CSCD_ARTICLE_COLLECTION = 'data_{}_article_{}'.format(SOURCE_NAME, 'cscd')
# cookie池配置
# COOKIE_POOL_CONFIG = dict(host=setting.REDIS_HOST, port=6379, db=setting.REDIS_DB, password=setting.REDIS_PASSWORD)
COOKIE_POOL_GROUP = 'cookies_pool:wos:sid*'
COOKIE_POOL_KEY = 'cookies_pool:wos:sid-sjtu'
COOKIE_TTL = 60 * 60 * 4
# 下载的单个文件的大小
BATCH_DOWNLOAD_LIMIT = 500
# 导出文件时的默认值
DEFAULT_EXPORT_RECORD_FILTER = "fullRecordPlus" # fullRecordPlus
# 表头验证配置
SUCCESS_TABLE_HEAD_START = b'\xef\xbb\xbfPT'
LOST_TABLE_HEAD_START = b'\xef\xbb\xbfnull'
AUTO_TABLE_HEAD_START = b'\xef\xbb\xbfPT\tAU\tBA\tBE\tGP\tAF\tBF\tCA\tTI\tSO\tSE\tBS\tLA\tDT\tCT\tCY\tCL\tSP\tHO\tDE\tID\tAB\tC1\tC3\tRP\tEM\tRI\tOI\tFU\tFP\tFX\tCR\tNR\tTC\tZ9\tU1\tU2\tPU\tPI\tPA\tSN\tEI\tBN\tJ9\tJI\tPD\tPY\tVL\tIS\tPN\tSU\tSI\tMA\tBP\tEP\tAR\tDI\tDL\tD2\tEA\tPG\tWC\tWE\tSC\tGA\tPM\tOA\tHC\tHP\tDA\tUT\r\n'
CORE_NAME_TABLE = dict(
WOSCC="Web of Science Core Collection",
BCI="BIOSIS Citation Index",
SCIELO="SciELO Citation Index",
RSCI="Russian Science Citation Index",
CSCD="Chinese Science Citation Database℠",
ARCI="Arabic Citation Index",
DIIDW="Derwent Innovations Index",
PPRN="",
PQDT="ProQuest ™ Dissertations & Theses Citation Index"
)
NAV_NAME_TABLE = dict(
SCI="Science Citation Index Expanded (SCI-Expanded)",
ESCI="Emerging Sources Citation Index (ESCI)",
SSCI="Social Sciences Citation Index (SSCI)",
ISTP="Conference Proceedings Citation Index Science (CPCI-S)",
BSCI="Book Citation Index Science (BKCI-S)",
AHCI="Arts & Humanities Citation Index (A&HCI)",
IC="Index Chemicus (IC)",
ISSHP="Conference Proceedings Citation Index Social Sciences & Humanities (CPCI-SSH)"
)
TASK_CONFIG = {
"school_id": 83,
"school_name": "北京林业大学",
"search_policy": """OG=(Beijing Forestry University)""",
"crawl_year": [2021, 2022, 2023],
"source_type": 1,
"priority": 10,
"is_important": 1,
"update_interval": 60 * 60 * 24 * 14,
"create_time": datetime.now(),
"last_time": datetime.now(),
"next_time": datetime.now(),
"state": 0
}

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# @Time : 2025/12/11 13:56
# @Author : zhaoxiangpeng
# @File : crawl_article_by_qid.py
import math
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from science_article_wos.spiders.download_by_search_record import DownloadBySearchRecordSpider
BATCH_DOWNLOAD_LIMIT = 500
def f(record_id: str, records_found: int, shard_count: int = None):
mark_start = 1
mark_end = 0
idx = 0
shard_count = shard_count or math.ceil(records_found / BATCH_DOWNLOAD_LIMIT)
for i in range(shard_count):
idx += 1
mark_end += BATCH_DOWNLOAD_LIMIT
if mark_end > records_found:
mark_end = records_found
yield dict(
record_id=record_id,
mark_from=mark_start, mark_to=mark_end,
shard=idx, shard_count=shard_count,
records_found=records_found
)
mark_start += BATCH_DOWNLOAD_LIMIT
def ready():
"""
把待采集的任务入库
:return:
"""
RECORDS_FOUND = 1486
def test_starter():
init_params = dict(
record_id='68ce1627-b4c3-4938-adcb-476c7dcde004-0192d3c012',
mark_from=1, mark_to=50,
shard=1, shard_count=51,
records_found=25256
)
process = CrawlerProcess(get_project_settings())
process.crawl(DownloadBySearchRecordSpider, **init_params)
process.start()
def starter():
process = CrawlerProcess(get_project_settings())
process.crawl(DownloadBySearchRecordSpider)
process.start()

@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
# @Time : 2025/12/15 16:47
# @Author : zhaoxiangpeng
# @File : search_records_orm.py
import math
from science_article_wos.dao.database.connection import DatabaseManager
from science_article_wos.dao.models.search_record import SearchRecord
BATCH_DOWNLOAD_LIMIT = 500
def f(record_id: str, records_found: int, shard_count: int = None):
mark_start = 1
mark_end = 0
idx = 0
shard_count = shard_count or math.ceil(records_found / BATCH_DOWNLOAD_LIMIT)
for i in range(shard_count):
idx += 1
mark_end += BATCH_DOWNLOAD_LIMIT
if mark_end > records_found:
mark_end = records_found
yield dict(
record_id=record_id,
mark_from=mark_start, mark_to=mark_end,
shard=idx, shard_count=shard_count,
records_found=records_found
)
mark_start += BATCH_DOWNLOAD_LIMIT
if __name__ == "__main__":
# 根据您的数据库类型选择连接字符串
# MySQL
db_url = "mysql+pymysql://root:admin000@localhost/crawler"
# SQLite
# db_url = "sqlite:///search_records.db"
# 初始化数据库管理器
db_manager = DatabaseManager(db_url)
# 创建表
db_manager.create_tables()
# 使用示例
with db_manager.session_scope() as session:
# search_record_id = "02f30273-1342-4d61-9e51-c1ea1f5b2423-0190efdd10"
# for d in f(search_record_id, 10641):
# # 创建新记录
# new_record = SearchRecord(
# **d
# )
#
# session.add(new_record)
# print(f"记录已添加: {new_record}")
# session.commit()
# 查询记录
record = session.query(SearchRecord).filter_by(state="pending").first()
if record:
print(f"查询到记录: {record}")
# 更新记录
if record:
record.state = "processing"
record.reason = "正在处理数据"
session.commit()
print(f"记录已更新: {record}")
Loading…
Cancel
Save