Compare commits
17 Commits
9b9b14a383
...
a95f242bd5
| Author | SHA1 | Date |
|---|---|---|
|
|
a95f242bd5 | 1 month ago |
|
|
a97423e71f | 1 month ago |
|
|
7883a2d349 | 2 months ago |
|
|
d452c24baf | 2 months ago |
|
|
21c8179918 | 2 months ago |
|
|
7b97965a85 | 2 months ago |
|
|
efe077695d | 2 months ago |
|
|
8b2862774e | 2 months ago |
|
|
17b5253fde | 2 months ago |
|
|
14eea8c9d1 | 2 months ago |
|
|
abdad5b786 | 2 months ago |
|
|
12e9ed53a9 | 2 months ago |
|
|
e48c1cc704 | 2 months ago |
|
|
d9e96bd3cc | 2 months ago |
|
|
bcbc59e659 | 2 months ago |
|
|
b47caf1c59 | 2 months ago |
|
|
1b0da2c41e | 2 months ago |
@ -1,12 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="PYTHON_MODULE" version="4">
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$" />
|
||||
<orderEntry type="jdk" jdkName="pydevenv" jdkType="Python SDK" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
<component name="PyDocumentationSettings">
|
||||
<option name="format" value="PLAIN" />
|
||||
<option name="myDocStringFormat" value="Plain" />
|
||||
</component>
|
||||
</module>
|
||||
@ -0,0 +1,54 @@
|
||||
# 使用 Python 官方镜像
|
||||
FROM python:3.11-slim
|
||||
|
||||
# 设置工作目录
|
||||
WORKDIR /app
|
||||
|
||||
# 安装系统依赖(包含浏览器相关)
|
||||
RUN apt-get update && apt-get install -y \
|
||||
wget \
|
||||
curl \
|
||||
gnupg \
|
||||
ca-certificates \
|
||||
fonts-liberation \
|
||||
libasound2 \
|
||||
libatk-bridge2.0-0 \
|
||||
libatk1.0-0 \
|
||||
libatspi2.0-0 \
|
||||
libcups2 \
|
||||
libdbus-1-3 \
|
||||
libdrm2 \
|
||||
libgbm1 \
|
||||
libgtk-3-0 \
|
||||
libnspr4 \
|
||||
libnss3 \
|
||||
libxcomposite1 \
|
||||
libxdamage1 \
|
||||
libxfixes3 \
|
||||
libxrandr2 \
|
||||
xdg-utils \
|
||||
--no-install-recommends \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# 安装 Chrome 浏览器(如果需要)
|
||||
RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \
|
||||
&& sh -c 'echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list' \
|
||||
&& apt-get update \
|
||||
&& apt-get install -y google-chrome-stable \
|
||||
--no-install-recommends \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# 复制依赖文件
|
||||
COPY requirements.txt .
|
||||
|
||||
# 安装 Python 依赖
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# 复制应用代码
|
||||
COPY . .
|
||||
|
||||
# 设置环境变量
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
|
||||
# 运行应用
|
||||
CMD ["python", "app.py"]
|
||||
@ -0,0 +1,10 @@
|
||||
requests~=2.32.4
|
||||
scrapy~=2.13.3
|
||||
pymongo~=4.13.0
|
||||
itemadapter~=0.11.0
|
||||
happybase~=1.2.0
|
||||
fastapi~=0.116.1
|
||||
redis~=6.2.0
|
||||
parsel~=1.10.0
|
||||
sympy~=1.14.0
|
||||
pydantic~=2.0.3
|
||||
@ -1,12 +0,0 @@
|
||||
from scrapy.crawler import CrawlerProcess
|
||||
from scrapy.utils.project import get_project_settings
|
||||
|
||||
from science_article_add.scripts.get_db_task import TaskManager
|
||||
|
||||
tm = TaskManager()
|
||||
process = CrawlerProcess(get_project_settings())
|
||||
|
||||
task = tm.get_task_from_mysql()
|
||||
|
||||
process.crawl('wos_latest_increment', task_obj=task)
|
||||
process.start()
|
||||
@ -0,0 +1,296 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2025/11/24 09:25
|
||||
# @Author : zhaoxiangpeng
|
||||
# @File : wos_search_export.py
|
||||
import math
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
from datetime import datetime
|
||||
|
||||
import redis
|
||||
from DrissionPage import Chromium
|
||||
from DrissionPage import ChromiumPage, ChromiumOptions
|
||||
from DrissionPage._pages.chromium_tab import ChromiumTab
|
||||
from DrissionPage._units.listener import DataPacket, Response
|
||||
from DrissionPage.errors import ElementNotFoundError
|
||||
|
||||
from science_article_add.utils import tools
|
||||
from science_article_add.scripts.wos_parse_data import parse_full_records_txt
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
LINK = "https://webofscience.clarivate.cn/wos/woscc/advanced-search"
|
||||
BATCH_DOWNLOAD_LIMIT = 500
|
||||
|
||||
|
||||
class Settings:
|
||||
env = "dev"
|
||||
SEARCH_ROUTE = '/api/wosnx/core/runQuerySearch'
|
||||
EXPORT_ROUTE = '/api/wosnx/indic/export/saveToFile'
|
||||
DB_CHANGE_ELE = '//*[@id="global-select"]/div/div[@aria-label="Select database"]/div[@title="Web of Science Core Collection"]'
|
||||
QUERY_INPUT_ELE = '//*[@id="advancedSearchInputArea"]'
|
||||
SEARCH_BUTTON_ELE = '//button[@data-ta="run-search"]/span[@class="mat-mdc-button-touch-target"]'
|
||||
|
||||
EXPORT_BUTTON_ELE = '//*[@id="export-trigger-btn"]'
|
||||
TABWIN_BUTTON_ELE = '//*[@id="exportToTabWinButton"]' # 制表符分割文件button
|
||||
|
||||
RECORD_TYPE_SELECT_ELE = '//div[@class="ng-star-inserted"]/wos-select/button[@aria-haspopup="listbox"]' # 记录内容选择框
|
||||
FULL_RECORD_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="Full Record"]' # 完整记录
|
||||
FULL_RECORD_REFERENCE_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="Full Record and Cited References"]' # 全记录与参考文献
|
||||
|
||||
RECORD_RANGE_ELE = '//*[@id="radio3-input"]' # 记录范围
|
||||
RECORD_EXPORT_START_ELE = '//input[@name="markFrom"]'
|
||||
RECORD_EXPORT_END_ELE = '//input[@name="markTo"]'
|
||||
|
||||
EXPORT_FILE_ELE = '//*[@id="exportButton"]'
|
||||
|
||||
INPUT_CONTENT = '(OG=(Anhui University of Science & Technology)) AND PY=(2025)'
|
||||
|
||||
|
||||
class ProSettings(Settings):
|
||||
DB_CHANGE = '//*[@id="global-select"]/div/div[@aria-label="Select database"]/div[@title="Web of Science 核心合集"]'
|
||||
EXPORT_BUTTON_ELE = '//botton[@id="export-trigger-btn"]'
|
||||
FULL_RECORD_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="完整记录"]' # 完整记录
|
||||
FULL_RECORD_REFERENCE_ELE = '//div[@id="global-select"]//div[@class="options options-menu"]/div[@title="全记录与引用的参考文献"]' # 全记录与参考文献
|
||||
|
||||
|
||||
settings = Settings()
|
||||
|
||||
|
||||
class WosSearchExport:
|
||||
_records_found = 0
|
||||
inited: bool = False
|
||||
is_running = False
|
||||
|
||||
def __init__(self, query_content: Any, options=None):
|
||||
self._records_found = 0
|
||||
self._query_id = None
|
||||
self.query_content = query_content
|
||||
self.options = options
|
||||
|
||||
@classmethod
|
||||
def create_instance(cls, config: dict):
|
||||
return cls(
|
||||
query_content=config.get("query_content"),
|
||||
options=config.get('options')
|
||||
)
|
||||
|
||||
def set_records_found(self, val):
|
||||
self._records_found = val
|
||||
|
||||
def get_records_found(self) -> int:
|
||||
return self._records_found
|
||||
|
||||
def set_query_id(self, query_id):
|
||||
self._query_id = query_id
|
||||
|
||||
def get_query_id(self):
|
||||
return self._query_id
|
||||
|
||||
def _initialize(self):
|
||||
self.browser = Chromium(self.options)
|
||||
self.tab = self.browser.latest_tab
|
||||
# 都只需要执行一次
|
||||
self.open_url(LINK)
|
||||
# 处理cookie的首选项
|
||||
self.operate_cookie_first()
|
||||
self.change_db()
|
||||
self.inited = True
|
||||
|
||||
def open_url(self, url):
|
||||
logger.debug('Opening url: %s' % url)
|
||||
self.tab.get(url)
|
||||
|
||||
def operate_cookie_first(self):
|
||||
# cookie管理处理
|
||||
logger.debug('Operating cookie first...')
|
||||
ck_m_div = self.tab.ele('xpath://*[@id="onetrust-banner-sdk"]')
|
||||
if ck_m_div:
|
||||
ele = self.tab.ele('xpath://*[@id="onetrust-accept-btn-handler"]')
|
||||
ele.click()
|
||||
|
||||
def change_db(self):
|
||||
logger.info('Changing database...')
|
||||
default_db_ele = self.tab.ele('xpath://*[@id="snSelectDb"]/button')
|
||||
c1 = default_db_ele.raw_text
|
||||
default_db_ele.click()
|
||||
self.tab.ele(
|
||||
'xpath:%(xpath)s' % {"xpath": settings.DB_CHANGE_ELE}).click()
|
||||
|
||||
def input_query(self, content: str, clear_input: bool = True, tab=None):
|
||||
tab = tab or self.tab
|
||||
input_area_ele = tab.ele('xpath:%(xpath)s' % {"xpath": settings.QUERY_INPUT_ELE})
|
||||
if clear_input:
|
||||
input_area_ele.clear() # 清空
|
||||
|
||||
input_area_ele.input(content) # 输入检索内容
|
||||
|
||||
def listen_func():
|
||||
tab.listen.start(settings.SEARCH_ROUTE, method="POST")
|
||||
|
||||
def operation_func():
|
||||
search_button_ele = tab.ele('xpath:%(xpath)s' % {"xpath": settings.SEARCH_BUTTON_ELE})
|
||||
search_button_ele.click()
|
||||
|
||||
def capture_packet(packet: DataPacket):
|
||||
search_url = tab.url
|
||||
record_id, records_found = self.get_record_info(packet.response.body)
|
||||
self.set_records_found(records_found)
|
||||
self.set_query_id(record_id)
|
||||
if not self.get_query_id():
|
||||
logger.warning('未找到记录 %s' % packet.response.body)
|
||||
|
||||
if records_found == 0:
|
||||
logger.warning('检索式 "%s" 找到记录 %s 条' % (self.query_content, records_found))
|
||||
return
|
||||
|
||||
else:
|
||||
logger.info('检索式 "%s" 找到记录 %s 条' % (self.query_content, records_found))
|
||||
|
||||
return True
|
||||
|
||||
self.intercept(listen=listen_func, operation=operation_func, callback=capture_packet, tab=tab)
|
||||
|
||||
def download_records(self):
|
||||
for b in self.distribute_page():
|
||||
query_id, batch_id, mark_start, mark_end = b
|
||||
self.rpa_download(mark_start, mark_end, batch=batch_id, tab=self.tab)
|
||||
|
||||
def distribute_page(self):
|
||||
# 计算页码
|
||||
logger.info("prepare downloading...")
|
||||
records_found = self.get_records_found()
|
||||
query_id = self.get_query_id()
|
||||
mark_start = 1
|
||||
mark_end = 0
|
||||
batch_id = 0
|
||||
for i in range(math.ceil(records_found / BATCH_DOWNLOAD_LIMIT)):
|
||||
mark_end += BATCH_DOWNLOAD_LIMIT
|
||||
if mark_end > records_found:
|
||||
mark_end = records_found
|
||||
batch_id += 1
|
||||
yield query_id, batch_id, mark_start, mark_end
|
||||
|
||||
mark_start += BATCH_DOWNLOAD_LIMIT
|
||||
|
||||
def clear_query(self):
|
||||
pass
|
||||
|
||||
def reflush_query(self):
|
||||
pass
|
||||
|
||||
def reflush_page(self):
|
||||
pass
|
||||
|
||||
def rpa_download(self, start: int = 1, end: int = 500, batch: str | int = None, tab=None):
|
||||
"""
|
||||
点击下载前拦截api
|
||||
"""
|
||||
try:
|
||||
logger.debug("download starting...")
|
||||
tab = tab or self.tab
|
||||
tab.ele('xpath:%(xpath)s' % {"xpath": settings.EXPORT_BUTTON_ELE}).click() # 点击导出
|
||||
tab.ele('xpath:%(xpath)s' % {"xpath": settings.TABWIN_BUTTON_ELE}).click() # 选择制表符分割
|
||||
# 等待弹框
|
||||
# 切换导出格式选择全记录与参考文献
|
||||
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_TYPE_SELECT_ELE}).click()
|
||||
tab.ele('xpath:%(xpath)s' % {"xpath": settings.FULL_RECORD_REFERENCE_ELE}).click()
|
||||
|
||||
# 输入记录起止
|
||||
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_RANGE_ELE}).click() # 切换到范围
|
||||
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_EXPORT_START_ELE}).input(start, clear=True)
|
||||
tab.ele('xpath:%(xpath)s' % {"xpath": settings.RECORD_EXPORT_END_ELE}).input(end, clear=True)
|
||||
except ElementNotFoundError:
|
||||
self.reflush_page()
|
||||
|
||||
def listen_func():
|
||||
tab.listen.start(settings.EXPORT_ROUTE, method="POST")
|
||||
|
||||
def operation_func():
|
||||
# tab.ele('xpath:%(xpath)s' % {"xpath": settings.EXPORT_FILE_ELE}).click() # 点击导出按钮
|
||||
tab.ele('xpath:%(xpath)s' % {"xpath": settings.EXPORT_FILE_ELE}).click.to_download(
|
||||
save_path=DOWNLOAD_PATH,
|
||||
rename='%s.txt' % batch
|
||||
)
|
||||
|
||||
def capture_packet(packet: DataPacket):
|
||||
g = self._parse_download(packet.response)
|
||||
for i in g:
|
||||
print(i)
|
||||
return True
|
||||
|
||||
self.intercept(listen=listen_func, operation=operation_func, callback=capture_packet, tab=tab)
|
||||
|
||||
def intercept(self, listen, operation, callback, tab=None):
|
||||
listen()
|
||||
operation()
|
||||
for packet in tab.listen.steps(count=3):
|
||||
print(packet.response.body)
|
||||
if not self.intercept_verify(packet):
|
||||
continue
|
||||
r = callback(packet)
|
||||
if r:
|
||||
break
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def intercept_verify(packet: DataPacket):
|
||||
content = packet.response.body
|
||||
if isinstance(content, bytes) and content.find(b'"Server.passiveVerificationRequired"') != -1:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def _parse_download(self, response: Response):
|
||||
batch_time = datetime.now()
|
||||
item_g = parse_full_records_txt(response.body.encode())
|
||||
parse_count = 0
|
||||
for data_dic in item_g:
|
||||
t_id = data_dic.pop('ut', None)
|
||||
if t_id:
|
||||
parse_count += 1
|
||||
yield dict(third_id=t_id, exported=data_dic, updated_at=batch_time)
|
||||
# 解析被引量
|
||||
if cited_num := tools.str2int(data_dic.get("tc", 0), 0):
|
||||
yield dict(third_id=t_id, cited=cited_num, updated_at=batch_time)
|
||||
|
||||
@staticmethod
|
||||
def get_record_info(body: bytes):
|
||||
resp_texts = body.strip().split(b'\n')
|
||||
query_id = None
|
||||
records_found = 0
|
||||
for resp_text in resp_texts:
|
||||
resp_row_dict: dict = json.loads(resp_text)
|
||||
if resp_row_dict.get("key") == "searchInfo":
|
||||
query_id = resp_row_dict.get("payload", {}).get("QueryID")
|
||||
records_found = resp_row_dict.get("payload", {}).get("RecordsFound") # 找到的记录
|
||||
break # 找到就结束
|
||||
return query_id, records_found
|
||||
|
||||
def execute(self):
|
||||
if not self.inited:
|
||||
logger.info('初始化页面')
|
||||
self._initialize()
|
||||
self.input_query(self.query_content)
|
||||
self.download_records()
|
||||
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
self.tab.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
DOWNLOAD_PATH = r'Y:\wos-metadata\wos increment-202512\00'
|
||||
conf = dict(
|
||||
query_content="(OG=(Southwest University of Science & Technology - China)) AND PY=(2025)",
|
||||
download_dir=DOWNLOAD_PATH
|
||||
)
|
||||
co = ChromiumOptions() # .headless()
|
||||
co.set_pref('download.default_directory', conf['download_dir'])
|
||||
conf['options'] = co
|
||||
|
||||
ins = WosSearchExport.create_instance(config=conf)
|
||||
ins.execute()
|
||||
@ -0,0 +1,139 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2025/10/31 10:24
|
||||
# @Author : zhaoxiangpeng
|
||||
# @File : distribute_task.py
|
||||
|
||||
from typing import Any
|
||||
import pymysql
|
||||
from science_article_add.utils import tools
|
||||
|
||||
SELECT_STRATEGY_SQL = '''SELECT
|
||||
r.org_id, q.id, q.content, q.param, q.disable_flag, q.state
|
||||
FROM relation_org_query AS r JOIN task_search_strategy AS q
|
||||
ON r.query_id = q.id
|
||||
WHERE
|
||||
r.org_name="%(org_name)s" AND disable_flag = 0'''
|
||||
CREATE_RECORD_SQL = '''insert into task_batch_record (batch_date, query_id, task_condition) VALUES ("%(batch_date)s", %(query_id)s, %(task_condition)s)'''
|
||||
|
||||
ORG_STRATEGY_SQL = """
|
||||
SELECT r.%(org_id)s, r.%(org_name)s, r.%(query_id)s, q.%(content)s, q.%(source_type)s
|
||||
FROM task_search_strategy AS q JOIN relation_org_query AS r ON r.query_id = q.id
|
||||
WHERE q.id = %(q_id)s
|
||||
"""
|
||||
ORG_STRATEGY_FIELDS = ['org_id', 'org_name', 'query_id', 'content', 'source_type']
|
||||
|
||||
|
||||
class CrawlTaskManager:
|
||||
def __init__(self):
|
||||
self.client: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306,
|
||||
database='science_data_dept', user='science-data-dept',
|
||||
passwd='datadept1509', )
|
||||
|
||||
def execute_sql(self, sql):
|
||||
cursor = self.client.cursor()
|
||||
try:
|
||||
cursor.execute(sql)
|
||||
results = cursor.fetchall()
|
||||
return results
|
||||
except Exception as e:
|
||||
raise e
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
def find_task_by_school_name(self, school_name, source_type: int = None):
|
||||
cursor = self.client.cursor()
|
||||
try:
|
||||
# 查询
|
||||
select_fields = ['org_id', 'id', 'content', 'disable_flag', 'state']
|
||||
select_sql = 'select %()s from task_search_strategy as q join relation_org_query as r ON q.id = r.query_id where q.source_type = %(source_type)s'
|
||||
cursor.execute(
|
||||
select_sql
|
||||
)
|
||||
find_result = cursor.fetchall()
|
||||
|
||||
except pymysql.MySQLError as e:
|
||||
pass
|
||||
|
||||
def create_crawler_task(self, query_id: int, condition: Any = None, source_type: int = None):
|
||||
cursor = self.client.cursor()
|
||||
try:
|
||||
insert_sql = CREATE_RECORD_SQL % {
|
||||
'batch_date': tools.get_today_date(),
|
||||
'query_id': query_id,
|
||||
'task_condition': condition
|
||||
}
|
||||
cursor.execute(
|
||||
insert_sql
|
||||
)
|
||||
cursor.connection.commit()
|
||||
return cursor.lastrowid
|
||||
except pymysql.MySQLError as e:
|
||||
print(e)
|
||||
return None
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
def get_crawler_task(self, task_id: int = None, source_type: int = None, state: int = None):
|
||||
STRATEGY_FIELDS = ['org_id', 'org_name', 'query_id', 'content', 'source_type']
|
||||
cursor = self.client.cursor()
|
||||
try:
|
||||
record_fields = ['id', 'batch_date', 'query_id', 'task_condition', 'is_done']
|
||||
condition = {}
|
||||
if state is not None:
|
||||
condition['is_done'] = state
|
||||
else:
|
||||
condition['is_done'] = 0
|
||||
if task_id:
|
||||
condition['id'] = task_id
|
||||
|
||||
sql = "select %(fields)s from task_batch_record where %(condition)s" % {
|
||||
'fields': ', '.join(record_fields), 'condition': ' and '.join([f'{k}={v}' for k, v in condition.items()])
|
||||
}
|
||||
if source_type:
|
||||
pass
|
||||
cursor.execute(sql)
|
||||
result = cursor.fetchone()
|
||||
if result is None:
|
||||
return
|
||||
task_record_dic = dict(zip(record_fields, result))
|
||||
fill = dict(zip(STRATEGY_FIELDS, STRATEGY_FIELDS))
|
||||
fill.update(q_id=task_record_dic.get("query_id"))
|
||||
cursor.execute(
|
||||
ORG_STRATEGY_SQL % fill,
|
||||
)
|
||||
result = cursor.fetchone()
|
||||
task_dic = dict(zip(STRATEGY_FIELDS, result))
|
||||
task_dic.update(task_record_dic)
|
||||
return task_dic
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
def _build_condition(self, source_type: int = None):
|
||||
if source_type is None:
|
||||
source_type = 1
|
||||
if source_type == 1:
|
||||
condition = 'AND PY=()'
|
||||
|
||||
|
||||
# def test_create_one():
|
||||
# manager = CrawlTaskManager()
|
||||
# manager.create_crawler_task(1542, condition='NULL', source_type=1)
|
||||
|
||||
|
||||
def main():
|
||||
manager = CrawlTaskManager()
|
||||
rr = manager.execute_sql('select id from task_search_strategy where disable_flag=0 and source_type=1 and state=0 limit 20')
|
||||
# rr = manager.execute_sql('select id from task_search_strategy where disable_flag=0 and source_type=1 and id in (1124, 1148, 1159, 1162, 1163, 1164, 1534, 1535)')
|
||||
query_ids = []
|
||||
for c in rr:
|
||||
record_id = manager.create_crawler_task(c[0], condition='"AND PY=(2025-2026)"', source_type=1)
|
||||
query_ids.append(c[0])
|
||||
print(record_id)
|
||||
changed = [str(i) for i in query_ids]
|
||||
print(changed)
|
||||
ok = 'update task_search_strategy set state=1 where id in (%s)' % ', '.join(changed)
|
||||
print(ok)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -0,0 +1,95 @@
|
||||
from typing import Any, List, Union
|
||||
from datetime import datetime
|
||||
import scrapy
|
||||
from scrapy.http import Response
|
||||
from scrapy.http.request.json_request import JsonRequest
|
||||
from scrapy.crawler import Crawler
|
||||
|
||||
from science_article_add.items.wos import WosArticleItem, WosCitedNumberItem, WosIdRelationItem
|
||||
from science_article_add.scripts.wos_parse_data import parse_full_records
|
||||
from science_article_add.models import wos_model as model
|
||||
from science_article_add.utils import tools
|
||||
from science_article_add.configs import wos as config
|
||||
|
||||
|
||||
def maybe_list(val: Union[int, List[int]]) -> List[int]:
|
||||
if isinstance(val, int):
|
||||
return [val]
|
||||
return list(val)
|
||||
|
||||
|
||||
class DownloadByQidSpider(scrapy.Spider):
|
||||
name = "download_by_qid"
|
||||
|
||||
custom_settings = dict(
|
||||
DOWNLOADER_MIDDLEWARES={
|
||||
"science_article_add.middlewares.wos.WosSidParamMiddleware": 500
|
||||
},
|
||||
ITEM_PIPELINES={
|
||||
"science_article_add.pipelines.mongo.MongoPipeline": 300,
|
||||
},
|
||||
LOG_LEVEL="INFO"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_crawler(cls, crawler, *args, **kwargs):
|
||||
return super().from_crawler(crawler, *args, **kwargs)
|
||||
|
||||
def __init__(self, record_id: str, mark_from: int = 1, mark_to: int = 500, records_found: int = None, **kwargs):
|
||||
super().__init__()
|
||||
self.record_id = record_id
|
||||
self.records_found = records_found
|
||||
self.mark_from = mark_from
|
||||
self.mark_to = mark_to
|
||||
self.task_id = None
|
||||
self.org_id = None
|
||||
self.query_id = None
|
||||
self.bind_relation_enable = False
|
||||
self.bind_relation_d = None
|
||||
if self.bind_relation_enable:
|
||||
self.build_relation()
|
||||
|
||||
def build_relation(self):
|
||||
bind_relation_d = dict()
|
||||
if self.task_id: self.bind_relation_d.setdefault("task_ids", maybe_list(self.task_id))
|
||||
if self.org_id: self.bind_relation_d.setdefault("school_ids", maybe_list(self.org_id))
|
||||
if self.query_id: self.bind_relation_d.setdefault("query_ids", maybe_list(self.query_id))
|
||||
self.bind_relation_d = bind_relation_d
|
||||
return bind_relation_d
|
||||
|
||||
async def start(self):
|
||||
query_id = self.record_id
|
||||
records_found = self.records_found
|
||||
mark_start = self.mark_from
|
||||
mark_end = self.mark_to
|
||||
yield JsonRequest(config.WOS_EXPORT_FILE_API, method='POST',
|
||||
data=model.export_search_data_to_txt(query_id, mark_from=mark_start,
|
||||
mark_to=mark_end),
|
||||
callback=self.download_parse)
|
||||
|
||||
def download_parse(self, response: Response, **kwargs: Any) -> Any:
|
||||
parse_count = 0
|
||||
batch_time = datetime.now()
|
||||
records = parse_full_records(response.body)
|
||||
for data_dic in records:
|
||||
t_id = data_dic.pop('ut', None)
|
||||
if t_id:
|
||||
parse_count += 1
|
||||
article_item = WosArticleItem()
|
||||
article_item['third_id'] = t_id
|
||||
article_item['exported'] = data_dic
|
||||
article_item['updated_at'] = batch_time
|
||||
yield article_item
|
||||
# 解析被引量
|
||||
if cited_num := tools.str2int(data_dic.get("tc", 0), 0):
|
||||
cited_item = WosCitedNumberItem()
|
||||
cited_item['third_id'] = t_id
|
||||
cited_item['cited'] = cited_num
|
||||
cited_item['updated_at'] = batch_time
|
||||
yield cited_item
|
||||
if self.bind_relation_enable and self.bind_relation_d:
|
||||
# 当启用绑定关系配置才会绑定各种关系
|
||||
relation_item = WosIdRelationItem()
|
||||
relation_item['third_id'] = t_id
|
||||
relation_item.update(**self.bind_relation_d)
|
||||
yield relation_item
|
||||
@ -0,0 +1,43 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2025/12/11 13:56
|
||||
# @Author : zhaoxiangpeng
|
||||
# @File : crawl_article_by_qid.py
|
||||
import math
|
||||
from scrapy.crawler import CrawlerProcess
|
||||
from scrapy.utils.project import get_project_settings
|
||||
from science_article_add.spiders.download_by_qid import DownloadByQidSpider
|
||||
|
||||
BATCH_DOWNLOAD_LIMIT = 500
|
||||
|
||||
process = CrawlerProcess(get_project_settings())
|
||||
RECORDS_FOUND = 1486
|
||||
wos_download_todo = [
|
||||
|
||||
]
|
||||
|
||||
|
||||
def f(record_id: str, records_found: int):
|
||||
mark_start = 1
|
||||
mark_end = 0
|
||||
idx = 0
|
||||
for i in range(math.ceil(records_found / BATCH_DOWNLOAD_LIMIT)):
|
||||
idx += 1
|
||||
mark_end += BATCH_DOWNLOAD_LIMIT
|
||||
|
||||
if mark_end > records_found:
|
||||
mark_end = records_found
|
||||
|
||||
yield dict(
|
||||
record_id=record_id, batch=idx,
|
||||
mark_from=mark_start, mark_to=mark_end, records_found=records_found
|
||||
)
|
||||
|
||||
mark_start += BATCH_DOWNLOAD_LIMIT
|
||||
|
||||
|
||||
init_params = dict(
|
||||
record_id='02f30273-1342-4d61-9e51-c1ea1f5b2423-0190efdd10',
|
||||
mark_from=1, mark_to=500, records_found=10641
|
||||
)
|
||||
process.crawl(DownloadByQidSpider, **init_params)
|
||||
process.start()
|
||||
@ -0,0 +1,41 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2025/12/11 17:07
|
||||
# @Author : zhaoxiangpeng
|
||||
# @File : crawl_article_by_ut.py
|
||||
import math
|
||||
import time
|
||||
import logging
|
||||
from twisted.internet import defer
|
||||
from scrapy.crawler import CrawlerProcess
|
||||
from scrapy.utils.project import get_project_settings
|
||||
from science_article_add.spiders.wos_download import WosDownloadSpider
|
||||
|
||||
logging.getLogger('pymongo').setLevel(logging.WARNING)
|
||||
logger = logging.getLogger(__name__)
|
||||
BATCH_DOWNLOAD_LIMIT = 500
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def crawl_sequentially():
|
||||
settings = get_project_settings()
|
||||
from pymongo import MongoClient
|
||||
client = MongoClient(settings.get("MONGO_URI"))
|
||||
db = client.get_database(settings.get("MONGO_DATABASE"))
|
||||
collection = db.get_collection("todo_ids_wos")
|
||||
|
||||
def f():
|
||||
count = collection.count_documents(filter={"state": 0})
|
||||
return count
|
||||
|
||||
while count_doc := f():
|
||||
logger.info('待下载数量 %d' % count_doc)
|
||||
yield process.crawl(WosDownloadSpider)
|
||||
time.sleep(60)
|
||||
|
||||
process.stop() # 所有爬虫结束后关闭事件循环
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
process = CrawlerProcess(get_project_settings())
|
||||
crawl_sequentially()
|
||||
process.start() # 阻塞直到所有爬虫完成
|
||||
@ -0,0 +1,30 @@
|
||||
from typing import Any, List, Dict, Self, AsyncIterator
|
||||
|
||||
import scrapy
|
||||
from scrapy.crawler import Crawler
|
||||
from science_article_cnki.models import cnki_model as model
|
||||
from science_article_cnki.configs import cnki as config
|
||||
|
||||
|
||||
class CnkiIdsDownloadSpider(scrapy.Spider):
|
||||
name = "cnki_ids_download"
|
||||
allowed_domains = ["cnki.net"]
|
||||
start_urls = ["https://cnki.net"]
|
||||
|
||||
@classmethod
|
||||
def from_crawler(cls, crawler: Crawler, *args: Any, **kwargs: Any) -> Self:
|
||||
return super().from_crawler(crawler, *args, **kwargs)
|
||||
|
||||
def __init__(self):
|
||||
scrapy.Spider.__init__(self)
|
||||
self.id_list: List[Dict[str, str]] = None
|
||||
|
||||
async def start(self):
|
||||
yield scrapy.FormRequest(
|
||||
config.CNKI_EXPORT_XLS_OLD_API,
|
||||
method='POST',
|
||||
formdata=model.export_data(ids),
|
||||
)
|
||||
|
||||
def parse(self, response):
|
||||
pass
|
||||
@ -0,0 +1,8 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2026/1/12 14:31
|
||||
# @Author : zhaoxiangpeng
|
||||
# @File : logformat.py
|
||||
|
||||
|
||||
def pformat_dict(**kwargs):
|
||||
return ', '.join([f'{k}={v}' for k, v in kwargs.items()])
|
||||
@ -0,0 +1,3 @@
|
||||
sqlalchemy~=1.3.24
|
||||
scrapy~=2.13.3
|
||||
itemadapter~=0.11.0
|
||||
@ -0,0 +1,13 @@
|
||||
# my_scrapy_project/models/base.py
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy import Column, Integer, DateTime
|
||||
from datetime import datetime
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class BaseModel(Base):
|
||||
"""基础模型类"""
|
||||
__abstract__ = True
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
@ -0,0 +1,4 @@
|
||||
# This package will contain the spiders of your Scrapy project
|
||||
#
|
||||
# Please refer to the documentation for information on how to create and manage
|
||||
# your spiders.
|
||||
@ -0,0 +1,110 @@
|
||||
from typing import Any, List, Union
|
||||
from datetime import datetime
|
||||
import scrapy
|
||||
from scrapy import signals
|
||||
from scrapy.http import Response
|
||||
from scrapy.http.request.json_request import JsonRequest
|
||||
|
||||
from .database import DatabaseSpider
|
||||
from science_article_wos.items import WosArticleItem, WosCitedNumberItem, WosIdRelationItem
|
||||
from science_article_wos.scripts.wos_parse_data import parse_full_records
|
||||
from science_article_wos.utils import model
|
||||
from science_article_wos.utils import tools
|
||||
from science_article_wos.utils import config
|
||||
|
||||
|
||||
def maybe_list(val: Union[int, List[int]]) -> List[int]:
|
||||
if isinstance(val, int):
|
||||
return [val]
|
||||
return list(val)
|
||||
|
||||
|
||||
class DownloadBySearchRecordSpider(DatabaseSpider):
|
||||
name = "download_by_search_record"
|
||||
custom_settings = dict(
|
||||
DOWNLOADER_MIDDLEWARES={
|
||||
"science_article_wos.middlewares.WosCookieMiddleware": 500
|
||||
},
|
||||
# ITEM_PIPELINES={
|
||||
# "science_article_wos.pipelines.MongoPipeline": 300,
|
||||
# },
|
||||
REDIS_URL='redis://:kcidea1509@192.168.1.211:6379/10',
|
||||
LOG_LEVEL="INFO"
|
||||
)
|
||||
|
||||
def spider_opened(self, spider):
|
||||
if self.record_id is None:
|
||||
# 从数据库中查询任务执行
|
||||
from science_article_wos.dao.database.connection import DatabaseManager
|
||||
from science_article_wos.dao.models.search_record import SearchRecord
|
||||
db_url = ""
|
||||
db_manager = DatabaseManager(db_url)
|
||||
with db_manager.session_scope() as session:
|
||||
record = session.query(SearchRecord).filter_by(state="pending").first()
|
||||
if record:
|
||||
print(f"查询到记录: {record}")
|
||||
self.record_id = record.record_id
|
||||
self.records_found = record.records_found
|
||||
self.mark_from = record.mark_from
|
||||
self.mark_to = record.mark_to
|
||||
self.shard = record.shard
|
||||
|
||||
def __init__(self, record_id: str = None, mark_from: int = 1, mark_to: int = 500, shard: str | int = None, records_found: int = None, **kwargs):
|
||||
super().__init__()
|
||||
self.record_id = record_id
|
||||
self.records_found = records_found
|
||||
self.mark_from = mark_from
|
||||
self.mark_to = mark_to
|
||||
self.shard = shard
|
||||
self.task_id = None
|
||||
self.org_id = None
|
||||
self.query_id = None
|
||||
self.bind_relation_enable = False
|
||||
self.bind_relation_d = None
|
||||
if self.bind_relation_enable:
|
||||
self.build_relation()
|
||||
|
||||
def build_relation(self):
|
||||
bind_relation_d = dict()
|
||||
if self.task_id: self.bind_relation_d.setdefault("task_ids", maybe_list(self.task_id))
|
||||
if self.org_id: self.bind_relation_d.setdefault("school_ids", maybe_list(self.org_id))
|
||||
if self.query_id: self.bind_relation_d.setdefault("query_ids", maybe_list(self.query_id))
|
||||
self.bind_relation_d = bind_relation_d
|
||||
return bind_relation_d
|
||||
|
||||
async def start(self):
|
||||
query_id = self.record_id
|
||||
records_found = self.records_found
|
||||
mark_start = self.mark_from
|
||||
mark_end = self.mark_to
|
||||
yield JsonRequest(config.WOS_EXPORT_FILE_API, method='POST',
|
||||
data=model.export_search_data_to_txt(query_id, mark_from=mark_start,
|
||||
mark_to=mark_end),
|
||||
callback=self.download_parse)
|
||||
|
||||
def download_parse(self, response: Response, **kwargs: Any) -> Any:
|
||||
parse_count = 0
|
||||
batch_time = datetime.now()
|
||||
records = parse_full_records(response.body)
|
||||
for data_dic in records:
|
||||
t_id = data_dic.pop('ut', None)
|
||||
if t_id:
|
||||
parse_count += 1
|
||||
article_item = WosArticleItem()
|
||||
article_item['third_id'] = t_id
|
||||
article_item['exported'] = data_dic
|
||||
article_item['updated_at'] = batch_time
|
||||
yield article_item
|
||||
# 解析被引量
|
||||
if cited_num := tools.str2int(data_dic.get("tc", 0), 0):
|
||||
cited_item = WosCitedNumberItem()
|
||||
cited_item['third_id'] = t_id
|
||||
cited_item['cited'] = cited_num
|
||||
cited_item['updated_at'] = batch_time
|
||||
yield cited_item
|
||||
if self.bind_relation_enable and self.bind_relation_d:
|
||||
# 当启用绑定关系配置才会绑定各种关系
|
||||
relation_item = WosIdRelationItem()
|
||||
relation_item['third_id'] = t_id
|
||||
relation_item.update(**self.bind_relation_d)
|
||||
yield relation_item
|
||||
@ -0,0 +1,59 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2025/12/11 13:56
|
||||
# @Author : zhaoxiangpeng
|
||||
# @File : crawl_article_by_qid.py
|
||||
import math
|
||||
from scrapy.crawler import CrawlerProcess
|
||||
from scrapy.utils.project import get_project_settings
|
||||
from science_article_wos.spiders.download_by_search_record import DownloadBySearchRecordSpider
|
||||
|
||||
BATCH_DOWNLOAD_LIMIT = 500
|
||||
|
||||
|
||||
def f(record_id: str, records_found: int, shard_count: int = None):
|
||||
mark_start = 1
|
||||
mark_end = 0
|
||||
idx = 0
|
||||
shard_count = shard_count or math.ceil(records_found / BATCH_DOWNLOAD_LIMIT)
|
||||
for i in range(shard_count):
|
||||
idx += 1
|
||||
mark_end += BATCH_DOWNLOAD_LIMIT
|
||||
|
||||
if mark_end > records_found:
|
||||
mark_end = records_found
|
||||
|
||||
yield dict(
|
||||
record_id=record_id,
|
||||
mark_from=mark_start, mark_to=mark_end,
|
||||
shard=idx, shard_count=shard_count,
|
||||
records_found=records_found
|
||||
)
|
||||
|
||||
mark_start += BATCH_DOWNLOAD_LIMIT
|
||||
|
||||
|
||||
def ready():
|
||||
"""
|
||||
把待采集的任务入库
|
||||
:return:
|
||||
"""
|
||||
RECORDS_FOUND = 1486
|
||||
|
||||
|
||||
def test_starter():
|
||||
init_params = dict(
|
||||
record_id='68ce1627-b4c3-4938-adcb-476c7dcde004-0192d3c012',
|
||||
mark_from=1, mark_to=50,
|
||||
shard=1, shard_count=51,
|
||||
records_found=25256
|
||||
)
|
||||
|
||||
process = CrawlerProcess(get_project_settings())
|
||||
process.crawl(DownloadBySearchRecordSpider, **init_params)
|
||||
process.start()
|
||||
|
||||
|
||||
def starter():
|
||||
process = CrawlerProcess(get_project_settings())
|
||||
process.crawl(DownloadBySearchRecordSpider)
|
||||
process.start()
|
||||
@ -0,0 +1,70 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2025/12/15 16:47
|
||||
# @Author : zhaoxiangpeng
|
||||
# @File : search_records_orm.py
|
||||
import math
|
||||
from science_article_wos.dao.database.connection import DatabaseManager
|
||||
from science_article_wos.dao.models.search_record import SearchRecord
|
||||
BATCH_DOWNLOAD_LIMIT = 500
|
||||
|
||||
|
||||
def f(record_id: str, records_found: int, shard_count: int = None):
|
||||
mark_start = 1
|
||||
mark_end = 0
|
||||
idx = 0
|
||||
shard_count = shard_count or math.ceil(records_found / BATCH_DOWNLOAD_LIMIT)
|
||||
for i in range(shard_count):
|
||||
idx += 1
|
||||
mark_end += BATCH_DOWNLOAD_LIMIT
|
||||
|
||||
if mark_end > records_found:
|
||||
mark_end = records_found
|
||||
|
||||
yield dict(
|
||||
record_id=record_id,
|
||||
mark_from=mark_start, mark_to=mark_end,
|
||||
shard=idx, shard_count=shard_count,
|
||||
records_found=records_found
|
||||
)
|
||||
|
||||
mark_start += BATCH_DOWNLOAD_LIMIT
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 根据您的数据库类型选择连接字符串
|
||||
# MySQL
|
||||
db_url = "mysql+pymysql://root:admin000@localhost/crawler"
|
||||
|
||||
# SQLite
|
||||
# db_url = "sqlite:///search_records.db"
|
||||
|
||||
# 初始化数据库管理器
|
||||
db_manager = DatabaseManager(db_url)
|
||||
|
||||
# 创建表
|
||||
db_manager.create_tables()
|
||||
|
||||
# 使用示例
|
||||
with db_manager.session_scope() as session:
|
||||
# search_record_id = "02f30273-1342-4d61-9e51-c1ea1f5b2423-0190efdd10"
|
||||
# for d in f(search_record_id, 10641):
|
||||
# # 创建新记录
|
||||
# new_record = SearchRecord(
|
||||
# **d
|
||||
# )
|
||||
#
|
||||
# session.add(new_record)
|
||||
# print(f"记录已添加: {new_record}")
|
||||
# session.commit()
|
||||
|
||||
# 查询记录
|
||||
record = session.query(SearchRecord).filter_by(state="pending").first()
|
||||
if record:
|
||||
print(f"查询到记录: {record}")
|
||||
|
||||
# 更新记录
|
||||
if record:
|
||||
record.state = "processing"
|
||||
record.reason = "正在处理数据"
|
||||
session.commit()
|
||||
print(f"记录已更新: {record}")
|
||||
Loading…
Reference in New Issue