wos:scrapy使用dp下载

main
zhaoxiangpeng 3 weeks ago
parent bcbc59e659
commit d9e96bd3cc

@ -0,0 +1,250 @@
# -*- coding: utf-8 -*-
# @Time : 2025/11/25 14:44
# @Author : zhaoxiangpeng
# @File : wos_dp_download.py
from __future__ import annotations
import math
from datetime import datetime
from typing import TYPE_CHECKING, Generator
from scrapy_drissionpage.spider import DrissionSpider
from science_article_add.items.wos import WosArticleItem, WosCitedNumberItem
from science_article_add.models.wos_model import get_record_info
from science_article_add.configs.wos_dp import settings as wos_dp_settings
from science_article_add.configs.wos import BATCH_DOWNLOAD_LIMIT
from science_article_add.utils import tools
from science_article_add.scripts.wos_parse_data import parse_full_records_txt
if TYPE_CHECKING:
from DrissionPage import ChromiumPage, ChromiumOptions
from scrapy_drissionpage.response import DrissionResponse
from DrissionPage._pages.chromium_tab import ChromiumTab
from DrissionPage._units.listener import DataPacket, Response
settings = wos_dp_settings
DOWNLOAD_PATH = r'Y:\wos-metadata\wos increment-202512\00'
class DpWosFileSpider(DrissionSpider):
name = "dp_wos_file"
start_urls = ["https://webofscience.clarivate.cn/wos/woscc/advanced-search"]
custom_settings = dict(
# 启用中间件
DOWNLOADER_MIDDLEWARES={
'scrapy_drissionpage.middleware.DrissionPageMiddleware': 543,
},
ITEM_PIPELINES={
"science_article_add.pipelines.mongo.MongoPipeline": 300,
},
EXTENSIONS={},
CONCURRENT_REQUESTS=1,
# DrissionPage配置
DRISSIONPAGE_HEADLESS=False, # 是否无头模式
DRISSIONPAGE_LOAD_MODE='normal', # 页面加载模式normal, eager, none
DRISSIONPAGE_DOWNLOAD_PATH='downloads', # 下载路径
DRISSIONPAGE_TIMEOUT=30, # 请求超时时间
DRISSIONPAGE_RETRY_TIMES=3, # 重试次数
DRISSIONPAGE_RETRY_INTERVAL=2, # 重试间隔(秒)
# 浏览器设置
DRISSIONPAGE_BROWSER_PATH=None, # 浏览器路径None使用默认浏览器
DRISSIONPAGE_INCOGNITO=True, # 是否使用无痕模式
DRISSIONPAGE_CHROME_OPTIONS=['--disable-gpu'], # Chrome启动选项}
)
_records_found = 0
_records_id = 0
query_content = "(OG=(Southwest University of Science & Technology - China)) AND PY=(2025)"
async def start(self):
yield self.drission_request(
url=self.start_urls[0],
callback=self.before_search,
page_type='chromium'
)
def before_search(self, response: DrissionResponse, **kwargs):
page: ChromiumPage = response.page # 重用页面
def operate_cookie_first():
# cookie管理处理
ck_m_div = page.ele('xpath://*[@id="onetrust-banner-sdk"]')
if ck_m_div:
ele = page.ele('xpath://*[@id="onetrust-accept-btn-handler"]')
ele.click()
operate_cookie_first() # cookie管理处理
# 切换数据库类型
page.ele('xpath://*[@id="snSelectDb"]/button').click()
page.ele('xpath:%(xpath)s' % {"xpath": settings.DB_CHANGE_ELE}).click()
# 开始检索流程
input_area_ele = page.ele('xpath:%(xpath)s' % {"xpath": settings.QUERY_INPUT_ELE})
input_area_ele.clear() # 清空
input_area_ele.input(self.query_content)
def listen_func():
page.listen.start(settings.SEARCH_ROUTE, method="POST")
def operation_func():
search_button_ele = page.ele('xpath:%(xpath)s' % {"xpath": settings.SEARCH_BUTTON_ELE})
search_button_ele.click()
def capture(packet: DataPacket):
search_url = page.url
record_id, records_found = get_record_info(packet.response.body)
self.set_records_found(records_found)
self.set_records_id(record_id)
if not self.get_records_id():
self.logger.warning('未找到记录 %s' % packet.response.body)
if records_found == 0:
self.logger.warning('检索式 "%s" 找到记录 %s' % (self.query_content, records_found))
return
else:
self.logger.info('检索式 "%s" 找到记录 %s' % (self.query_content, records_found))
return True
r = self.intercept(listen_func, operation=operation_func, callback=capture, tab=page)
print(r)
yield from self.download_records()
def before_download(self, response: DrissionResponse, **kwargs):
resp_meta = response.meta['wos_download_info']
g = self.rpa_download(
start=resp_meta['mark_start'],
end=resp_meta['mark_end'],
batch=resp_meta['batch_id'],
tab=self.current_tab
)
yield from g
def download_records(self):
for b in self.distribute_page():
query_id, batch_id, mark_start, mark_end = b
yield self.drission_request(
self.current_tab.url,
callback=self.before_download,
meta={'wos_download_info': dict(query_id=query_id, batch_id=batch_id, mark_start=mark_start,
mark_end=mark_end)}
)
# self.rpa_download(mark_start, mark_end, batch=batch_id, tab=self.current_tab)
def distribute_page(self):
# 计算页码
self.logger.info("prepare downloading...")
records_found = self.get_records_found()
query_id = self.get_records_id()
mark_start = 1
mark_end = 0
batch_id = 0
for i in range(math.ceil(records_found / BATCH_DOWNLOAD_LIMIT)):
mark_end += BATCH_DOWNLOAD_LIMIT
if mark_end > records_found:
mark_end = records_found
batch_id += 1
yield query_id, batch_id, mark_start, mark_end
mark_start += BATCH_DOWNLOAD_LIMIT
def rpa_download(self, start: int = 1, end: int = 500, batch: str | int = None, tab=None):
"""
点击下载前拦截api
"""
self.logger.debug("download starting...")
tab = tab or self.current_tab
tab.ele('xpath:%(xpath)s' % {"xpath": settings.EXPORT_BUTTON_ELE}).click() # 点击导出
tab.ele('xpath:%(xpath)s' % {"xpath": settings.TABWIN_BUTTON_ELE}).click() # 选择制表符分割
# 等待弹框
# 切换导出格式选择全记录与参考文献
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_TYPE_SELECT_ELE}).click()
tab.ele('xpath:%(xpath)s' % {"xpath": settings.FULL_RECORD_REFERENCE_ELE}).click()
# 输入记录起止
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_RANGE_ELE}).click() # 切换到范围
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_EXPORT_START_ELE}).input(start, clear=True)
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_EXPORT_END_ELE}).input(end, clear=True)
def listen_func():
tab.listen.start(settings.EXPORT_ROUTE, method="POST")
def operation_func():
tab.ele('xpath:%(xpath)s' % {"xpath": settings.EXPORT_FILE_ELE}).click.to_download(
save_path=DOWNLOAD_PATH,
rename='%s.txt' % batch
)
def capture_packet(packet: DataPacket):
g = self._parse_download(packet.response)
yield from g
return self.intercept(listen=listen_func, operation=operation_func, callback=capture_packet, tab=tab)
def _parse_download(self, response: Response):
batch_time = datetime.now()
item_g = parse_full_records_txt(response.body.encode())
parse_count = 0
for data_dic in item_g:
t_id = data_dic.pop('ut', None)
if t_id:
parse_count += 1
article_item = WosArticleItem()
article_item['third_id'] = t_id
article_item['exported'] = data_dic
article_item['updated_at'] = batch_time
yield article_item
# 解析被引量
if cited_num := tools.str2int(data_dic.get("tc", 0), 0):
cited_item = WosCitedNumberItem()
cited_item['third_id'] = t_id
cited_item['cited'] = cited_num
cited_item['updated_at'] = batch_time
yield cited_item
def intercept(self, listen, operation, callback, tab=None):
listen()
operation()
for packet in tab.listen.steps(count=3):
if not self.intercept_verify(packet):
continue
r = callback(packet)
if isinstance(r, Generator):
return r
else:
if isinstance(r, bool):
break
return
@staticmethod
def intercept_verify(packet: DataPacket):
content = packet.response.body
if isinstance(content, bytes) and content.find(b'"Server.passiveVerificationRequired"') != -1:
return False
else:
return True
def set_records_found(self, val):
self._records_found = val
def get_records_found(self) -> int:
return self._records_found
def set_records_id(self, val):
self._records_id = val
def get_records_id(self) -> str:
return self._records_id
if __name__ == '__main__':
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
process = CrawlerProcess(get_project_settings())
process.crawl(DpWosFileSpider)
process.start()
Loading…
Cancel
Save