Compare commits

...

16 Commits

Author SHA1 Message Date
zhaoxiangpeng 6c0d732877 cnki:采集被引量 3 weeks ago
zhaoxiangpeng 9a29a8ace7 change:配置项 1 month ago
zhaoxiangpeng 5d6a562cca add:utils 1 month ago
zhaoxiangpeng 669c7836b6 add:wos文件解析 1 month ago
zhaoxiangpeng 6d4b0a7dd9 add:校验下载的数据的完整性 1 month ago
zhaoxiangpeng a86cbc9952 add:wos item-从__init__继承 1 month ago
zhaoxiangpeng fea5d948ae add:增加异常处理 1 month ago
zhaoxiangpeng 5d44e5fe86 change:增加单条入库pipeline 1 month ago
zhaoxiangpeng fe7044292b change:任务完成确认和钉钉通知 2 months ago
zhaoxiangpeng 576260f52d change:org_id和org_name修改为list类型 2 months ago
zhaoxiangpeng 7cda5cc406 add:wos来源使用starterapi采集 2 months ago
zhaoxiangpeng e3a23ad33e add:wos来源使用starterapi采集 2 months ago
zhaoxiangpeng 78a76ba9f2 add:数据入库管道 2 months ago
zhaoxiangpeng 1c2fd3c988 add:mongo util 2 months ago
zhaoxiangpeng 129ab6569d add:任务完成mysql确认 2 months ago
zhaoxiangpeng ea68319ee6 add:scrapy添加钉钉通知 3 months ago

1
.gitignore vendored

@ -160,3 +160,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
/science_article_add/.idea/

@ -0,0 +1,57 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Optional, Dict, Tuple
from pymongo import MongoClient
from pymongo import UpdateOne
from pymongo.errors import DuplicateKeyError, BulkWriteError
if TYPE_CHECKING:
from pymongo.database import Database
from pymongo.collection import Collection
from pymongo.results import InsertManyResult, BulkWriteResult
def update_document(filter_query: dict = None, update_data: dict = None, replace: bool = True) -> Tuple[dict, dict]:
update_query = {}
if not update_data:
return {}, {}
for key, val in update_data.items():
if replace:
update_query.setdefault(
"$set", {}
).update(
{key: val}
)
else:
if isinstance(val, list):
update_query.setdefault(
"$addToSet", {}
).update({
key: {"$each": val}
})
else:
update_query.setdefault(
"$set", {}
).update(
{key: val}
)
return filter_query, update_query
class MongoDBUtils:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.client: MongoClient = None
self.db: Database = None
def _insert2db(self, items, tablename, ordered: bool = False, **kwargs) -> InsertManyResult:
collection: Collection = self.db.get_collection(tablename)
result: InsertManyResult = collection.insert_many(items, ordered=ordered, **kwargs)
return result
def _update2db(self, items, tablename, ordered: bool = False, **kwargs) -> BulkWriteResult:
collection: Collection = self.db.get_collection(tablename)
bulk_results: BulkWriteResult = collection.bulk_write(items, ordered=ordered, **kwargs)
return bulk_results

@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
# @Time : 2025/10/27 17:12
# @Author : zhaoxiangpeng
# @File : ackextension.py
import logging
import pymysql
from scrapy import signals
from scrapy.crawler import Crawler
logger = logging.getLogger(__name__)
class SpiderProtocol:
name: str
record_id: int
org_id: int
org_name: str
query_id: int
query_content: str
def get_records_found(self) -> int: ...
class ACKExtension:
def __init__(self, crawler: Crawler):
self.crawler = crawler
self.change_state_sql = 'update task_batch_record set %(update_kws)s where %(update_cond)s'
@classmethod
def from_crawler(cls, crawler):
ext = cls(crawler=crawler)
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_error, signal=signals.spider_error)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
return ext
def spider_opened(self, spider):
kws = {
'is_done': 2,
}
sql = self.change_state_sql % {
'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]),
'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id}
}
self._execute_sql(sql)
def spider_closed(self, spider: SpiderProtocol):
"""
# 修改任务状态
# 通知
"""
kws = {
'is_done': 1,
'result_count': spider.get_records_found(),
'updated_time': 'CURRENT_TIMESTAMP'
}
sql = self.change_state_sql % {
'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]),
'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id}
}
self._execute_sql(sql)
def spider_error(self, spider: SpiderProtocol):
kws = {
'is_done': -1,
'updated_time': 'CURRENT_TIMESTAMP'
}
sql = self.change_state_sql % {
'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]),
'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id}
}
self._execute_sql(sql)
def _execute_sql(self, sql):
settings = self.crawler.settings
client = pymysql.connect(
host=settings.get('MYSQL_HOST'),
port=settings.get('MYSQL_PORT', 3306),
database=settings.get('MYSQL_DATABASE'),
user=settings.get('MYSQL_USER'),
passwd=settings.get('MYSQL_PASSWORD'),
)
try:
cursor = client.cursor()
cursor.execute(sql)
cursor.connection.commit()
logger.info(f'Execute SQL: {sql}')
except Exception as e:
logger.exception(e)
finally:
client.close()

@ -0,0 +1,240 @@
# -*- coding: utf-8 -*-
# @Time : 2025/10/23 16:30
# @Author : deepseek
# @File : dingtalk.py
# extensions/dingtalk_extension.py
import json
import time
import logging
import requests
from scrapy import signals
from scrapy.exceptions import NotConfigured
SPIDER_START_MSG = """🚀 爬虫启动通知\n
**爬虫名称**: %(spider_name)s\n
**开始时间**: %(started_time)s\n
**状态**: 开始运行"""
# SPIDER_CLOSED_MSG = """📊 爬虫完成通知\n
# **爬虫名称**: %(spider_name)s\n
# **完成时间**: %(finished_time)s\n
# **完成原因**: %(finish_reason)s\n
# **采集统计**:\n
# - 采集项目: %(item_scraped_count)s 条
# - 请求响应: %(response_count)s 次
# - 错误数量: %(error_count)s 个\n
# **状态**: %(state)s"""
SPIDER_ERROR_MSG = """🚨 爬虫错误\n
**爬虫名称**: %(spider_name)s\n
**URL**: %(url)s\n
**错误**: %(err_msg)s"""
SPIDER_CLOSED_MSG = """📊 爬虫完成通知\n
**爬虫名称**: %(spider_name)s\n
**机构名称**: %(org_name)s\n
**任务条件**: %(task_condition)s\n
**任务ID**: %(record_id)s\n
**完成时间**: %(finished_time)s\n
**完成原因**: %(finish_reason)s\n
**采集统计**:\n
- 采集项目: %(item_scraped_count)s
- 请求响应: %(response_count)s
- 错误数量: %(error_count)s \n
**状态**: %(state)s"""
class DingTalkExtension:
"""钉钉机器人通知扩展"""
def __init__(
self,
crawler,
webhook_url=None, secret=None,
start_msg=None, closed_msg=None,
**kwargs
):
self.crawler = crawler
self.stats = crawler.stats
self.webhook_url = webhook_url
self.secret = secret
self.spider_start_message = start_msg or SPIDER_START_MSG
self.spider_closed_message = closed_msg or SPIDER_CLOSED_MSG
self.enable_start_notify = kwargs.get("enable_start_notify", False)
self.enable_finish_notify = kwargs.get("enable_finish_notify", False)
self.enable_error_notify = kwargs.get("enable_error_notify", False)
self.logger = logging.getLogger(__name__)
@classmethod
def from_crawler(cls, crawler):
# 从配置中获取钉钉webhook URL
webhook_url = crawler.settings['DINGTALK_WEBHOOK_URL']
if not webhook_url:
raise NotConfigured('DINGTALK_WEBHOOK_URL must be set')
ding_cfg = dict(
webhook_url=crawler.settings.get('DINGTALK_WEBHOOK_URL'),
secret=crawler.settings.get('DINGTALK_SECRET'),
# 获取自定义消息模板
start_msg=crawler.settings.get('DINGTALK_START_MESSAGE', SPIDER_START_MSG),
closed_msg=crawler.settings.get('DINGTALK_CLOSED_MESSAGE', SPIDER_CLOSED_MSG),
enable_start_notify=crawler.settings.getbool('DINGTALK_ENABLE_START', False),
enable_finish_notify=crawler.settings.getbool('DINGTALK_ENABLE_FINISH', False),
enable_error_notify=crawler.settings.getbool('DINGTALK_ENABLE_ERROR', False),
)
ext = cls(crawler=crawler, **ding_cfg)
# 注册信号处理器
if ext.enable_start_notify:
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
if ext.enable_finish_notify:
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
if ext.enable_error_notify:
crawler.signals.connect(ext.spider_error, signal=signals.spider_error)
crawler.signals.connect(ext.item_error, signal=signals.item_error)
return ext
def spider_opened(self, spider):
"""爬虫开始时发送通知"""
self.stats = spider.crawler.stats
message = self._get_default_start_message(spider)
self._send_dingtalk_message(message, spider)
def spider_closed(self, spider, reason):
"""爬虫结束时发送通知"""
message = self._get_default_closed_message(spider, reason)
self._send_dingtalk_message(message, spider)
def spider_error(self, failure, response, spider):
"""爬虫错误时发送通知"""
message = SPIDER_ERROR_MSG % {
"spider_name": spider.name,
"url": response.url,
"err_msg": str(failure.value),
}
self._send_dingtalk_message(message, spider, is_error=True)
def item_error(self, failure, response, spider):
pass
def _get_default_start_message(self, spider):
"""默认开始消息模板"""
message = self.spider_start_message % {"spider_name": spider.name, "started_time": self._get_current_time()}
return message
def _get_default_closed_message(self, spider, reason):
"""默认结束消息模板"""
stats = self.stats
# 获取统计信息
item_scraped_count = stats.get_value('item_scraped_count', 0)
response_count = stats.get_value('response_received_count', 0)
error_count = stats.get_value('log_count/ERROR', 0)
finish_reason = self._get_reason_display(reason)
task_obj = spider.task_obj
message = self.spider_closed_message % {
"spider_name": spider.name,
"org_name": task_obj['org_name'],
"task_condition": task_obj['task_condition'],
"record_id": spider.record_id,
"finished_time": self._get_current_time(),
"finish_reason": finish_reason,
"item_scraped_count": spider.get_records_found(),
"response_count": response_count,
"error_count": error_count,
"state": '✅ 成功完成' if reason == 'finished' else '⚠️ 异常结束'
}
return message
def _get_reason_display(self, reason):
"""获取完成原因的可读描述"""
reason_map = {
'finished': '正常完成',
'shutdown': '手动关闭',
'cancelled': '被取消',
}
return reason_map.get(reason, reason)
def _get_current_time(self):
"""获取当前时间"""
from datetime import datetime
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def _generate_signature(self, timestamp: int) -> str:
"""
生成签名
Args:
timestamp: 时间戳
Returns:
签名字符串
"""
if not self.secret:
return ""
import hmac
import hashlib
import base64
import urllib.parse
string_to_sign = f"{timestamp}\n{self.secret}"
hmac_code = hmac.new(
self.secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def _build_webhook_url(self) -> str:
"""
构建完整的webhook URL包含签名
Returns:
完整的webhook URL
"""
if not self.secret:
return self.webhook_url
timestamp = int(time.time() * 1000)
sign = self._generate_signature(timestamp)
return f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
def _send_dingtalk_message(self, message, spider, is_error=False):
"""发送钉钉消息"""
try:
webhook_url = self._build_webhook_url()
headers = {
"Content-Type": "application/json",
"User-Agent": "DingTalk-Bot/1.0"
}
# 构建消息体
data = {
"msgtype": "markdown",
"markdown": {
"title": f"爬虫通知 - {spider.name}",
"text": message
},
"at": {
"isAtAll": is_error # 如果是错误,@所有人
}
}
response = requests.post(
webhook_url,
data=json.dumps(data),
headers=headers,
timeout=10
)
if response.status_code == 200:
self.logger.info(f"钉钉通知发送成功: {spider.name}")
else:
self.logger.error(f"钉钉通知发送失败: {response.status_code} - {response.text}")
except Exception as e:
self.logger.error(f"发送钉钉通知时出错: {e}")

@ -0,0 +1,36 @@
import scrapy
from science_article_add.items import ArticleItem, IdRelationItem, ArticleCitedItem
class WosItem(scrapy.Item):
# define the fields for your item here like:
third_id = scrapy.Field()
updated_at = scrapy.Field()
class WosArticleItem(ArticleItem):
__tablename__ = 'data_wos_article'
third_id = scrapy.Field()
"""
wos发文item
"""
exported = scrapy.Field()
updated_at = scrapy.Field()
class WosCitedNumberItem(ArticleCitedItem):
__tablename__ = 'relation_cited_number_wos'
"""发文被引量item"""
third_id = scrapy.Field()
cited = scrapy.Field()
updated_at = scrapy.Field()
class WosIdRelationItem(IdRelationItem):
__tablename__ = 'relation_school_wos'
query_ids = scrapy.Field()
school_ids = scrapy.Field()
task_ids = scrapy.Field()

@ -57,3 +57,13 @@ class ScienceArticleAddPipeline(ScienceAddBufferPipeline):
if self.buffer_size >= self.buffer_max_size: if self.buffer_size >= self.buffer_max_size:
self.buffer.clear() self.buffer.clear()
return item return item
class BufferPipeline:
def __init__(self, buffer_max_size: int = 100):
self.buffer = {
"type1": [],
"type2": [],
}
self.buffer_size = 0
self.buffer_max_size = buffer_max_size

@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
# @Time : 2025/11/3 14:16
# @Author : zhaoxiangpeng
# @File : duptodo_pipeline.py
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from pymongo import MongoClient
from scrapy.exceptions import DropItem
from science_article_add.db_utils.buffer_component import SimpleBuffer
from science_article_add.items import IdRelationItem
from science_article_add.db_utils.mongo import MongoDBUtils
if TYPE_CHECKING:
from pymongo.collection import Collection
logger = logging.getLogger(__name__)
class DupTodoPipeline(MongoDBUtils):
def __init__(self, crawler, mongo_uri, mongo_db, buffer_max_size=None):
super().__init__(mongo_uri, mongo_db)
self.buffer = SimpleBuffer(buffer_max_size=buffer_max_size, flush_interval=10)
self.stats = crawler.stats
@classmethod
def from_crawler(cls, crawler):
return cls(
crawler=crawler,
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
buffer_max_size=crawler.settings.get("BUFFER_MAX_SIZE", 100),
)
def open_spider(self, spider):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
# 爬虫结束时刷新所有数据
for item_type in self.buffer.buffers.keys():
if self.buffer.get_buffer_size(item_type) > 0:
self._flush_buffer(item_type, spider)
def process_item(self, item, spider):
if isinstance(item, IdRelationItem):
# 确定Item类型
item_type = self._get_item_type(spider)
# 添加到缓冲区
should_flush = self.buffer.add_item(item, item_type)
# 如果需要刷新,执行插入操作
if should_flush:
self._flush_buffer(item_type, spider)
return item
def _flush_buffer(self, item_type: str, spider):
"""刷新缓冲区到数据库"""
items = self.buffer.get_buffer(item_type)
item_count = len(items)
affect_count = 0
try:
affect_count = item_count
dedup_items = self._contrast_dup_item(items, filter_key=self._get_dup_key(spider))
def gen():
for item in dedup_items:
item = {"third_id": item.pop("third_id")}
item['state'] = 0
yield item
datas = [d for d in gen()]
if not datas:
raise DropItem('⏳ 已存在的item: \n[%s]' % ', '.join([x['third_id'] for x in items]))
insert_result = self._insert2db(datas, item_type)
affect_count = len(insert_result.inserted_ids)
except DropItem as drop_item:
logger.warning("Filter item: %s", drop_item)
except Exception as e:
logger.error(f"❌ 插入 %s 失败 %s 条: {e}" % (item_type, item_count))
else:
logger.info('📝 入库 %s 行数 %s 条, 新增 %s 条, 去重 %s' % (
item_type, item_count, affect_count, item_count-affect_count))
finally:
# 清空缓冲区
self.buffer.clear_buffer(item_type)
def _contrast_dup_item(self, items, filter_key) -> list:
collection: Collection = self.db.get_collection(filter_key)
all_id_item_map = {item["third_id"]: item for item in items}
fingerprints = list(all_id_item_map.keys())
# 以third_id为条件查询数据用 exported.da 字段作为数据存在的依据,如果没有这个字段,则认为导出数据不存在
find_results = collection.find(filter={"third_id": {"$in": fingerprints}},
projection={"_id": 0, "third_id": 1, "exported.da": 1})
is_exist = 0
for document in find_results:
if document.get('exported') and ((doc_third_id := document.get('third_id')) in all_id_item_map):
is_exist += 1
all_id_item_map.pop(doc_third_id, None)
self.inc_item_dropped_count('is_exist')
dedup_results = list(all_id_item_map.values())
logger.info('%s 已存在 %s 条, 过滤后 %s' % (filter_key, is_exist, len(dedup_results)))
return dedup_results
def _get_dup_key(self, spider):
return 'data_%(source_type)s_article' % {"source_type": spider.source}
def _get_item_type(self, spider) -> str:
"""获取Item类型"""
return 'todo_ids_%(source_type)s' % {"source_type": spider.source}
def inc_item_dropped_count(self, reason):
self.stats.inc_value("item_dropped_count")
self.stats.inc_value(f"item_dropped_reasons_count/{reason}")

@ -0,0 +1,186 @@
# -*- coding: utf-8 -*-
# @Time : 2025/10/30 17:30
# @Author : zhaoxiangpeng
# @File : mongo_pipeline.py
from __future__ import annotations
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Tuple, Generator
from pymongo import MongoClient
from itemadapter import ItemAdapter
from pymongo.errors import (
DuplicateKeyError,
BulkWriteError
)
from science_article_add.db_utils.buffer_component import SimpleBuffer
from science_article_add.db_utils.mongo import MongoDBUtils, update_document
if TYPE_CHECKING:
from scrapy.crawler import Crawler
from scrapy.statscollectors import StatsCollector
mongo_logger = logging.getLogger('pymongo')
mongo_logger.setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
class MongoPipeline(MongoDBUtils):
def __init__(self, mongo_uri, mongo_db, stats: StatsCollector):
super().__init__(mongo_uri, mongo_db)
self.stats: StatsCollector = stats
self.insert_failure_update_enable = True
@classmethod
def from_crawler(cls, crawler: Crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
stats=crawler.stats
)
def open_spider(self, spider):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self, item, spider):
# 确定Item类型
adapter = ItemAdapter(item)
item_type = self._get_item_type(item)
collection = self.db.get_collection(item_type)
d = adapter.asdict()
try:
insert_result = collection.insert_one(d)
except DuplicateKeyError as duplicate_error:
if self.insert_failure_update_enable:
write_error = duplicate_error.details
key_pattern = write_error.get('keyPattern')
key_value = write_error.get('keyValue')
logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value)
[d.pop(k, None) for k in key_pattern.keys()]
up_result = collection.update_one(filter=key_value, update={"$set": d}, upsert=True)
except Exception:
raise
return item
def close_spider(self, spider):
self.client.close()
@staticmethod
def _get_item_type(item) -> str:
"""获取Item类型"""
if hasattr(item, '__tablename__'):
return item.item_type
return 'items_null_table'
class MongoPipelineMulti(MongoDBUtils):
def __init__(self, mongo_uri, mongo_db, buffer_max_size=None):
super().__init__(mongo_uri, mongo_db)
self.buffer = SimpleBuffer(buffer_max_size=buffer_max_size, flush_interval=10)
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
buffer_max_size=crawler.settings.get("BUFFER_MAX_SIZE", 100),
)
def open_spider(self, spider):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self, item, spider):
# 确定Item类型
item_type = self._get_item_type(item)
# 添加到缓冲区
should_flush = self.buffer.add_item(item, item_type)
# 如果需要刷新,执行插入操作
if should_flush:
self._flush_buffer(item_type)
return item
def close_spider(self, spider):
# 爬虫结束时刷新所有数据
for item_type in self.buffer.buffers.keys():
if self.buffer.get_buffer_size(item_type) > 0:
self._flush_buffer(item_type)
def _flush_buffer(self, item_type: str):
"""刷新缓冲区到数据库"""
items = self.buffer.get_buffer(item_type)
item_count = len(items)
if not items:
return
affect_count = 0
try:
affect_count = item_count
# 执行数据库插入
insert_result = self._insert2db(items, tablename=item_type)
affect_count = len(insert_result.inserted_ids)
except BulkWriteError as bulk_write_e:
write_errors = bulk_write_e.details.get('writeErrors')
current_time = datetime.now()
up_time_requests = []
errors = self._build__update(write_errors)
collection = self.db.get_collection(item_type)
for new_item in errors:
filter_query, update_query = new_item
up_result = collection.update_one(filter=filter_query, update=update_query)
affect_count -= 1
if up_result.matched_count == up_result.modified_count == 1:
third_id = filter_query.get("third_id")
third_id and up_time_requests.append(third_id) # 把第三方id加进去通过第三方id更新时间
if up_time_requests:
logger.info("修改批次时间为: %s" % current_time)
collection.update_many(
{"third_id": {"$in": up_time_requests}},
{"$set": {"updated_at": current_time}}
)
except Exception as e:
logger.error(f"❌ 插入失败: {e}")
# 插入失败,数据保留在缓冲区中
finally:
# 清空缓冲区
self.buffer.clear_buffer(item_type)
logger.info('✅ 入库 %s 行数 %s 条, 新增 %s 条, 更新 %s' % (
item_type, item_count, affect_count, item_count - affect_count))
def _build__update(self, write_errors) -> Generator[Tuple[dict, dict], Tuple[None, None]]:
for write_error in write_errors:
update_one = None, None
if write_error.get('code') == 11000:
update_one = self._build_dup_error(write_error)
if update_one:
yield update_one
@staticmethod
def _build_dup_error(write_error) -> tuple[None, None] | tuple[dict, dict]:
"""可能被重写实现自定义的逻辑"""
original_doc = write_error.get('op') # 插入的数据
key_pattern = write_error.get('keyPattern')
original_doc.pop("_id", None) # 删掉插入失败产生的_id
filter_query = {}
update_query = {key: val for key, val in original_doc.items() if val}
update_query.pop('updated_at', None) # 删除不确定因素时间防止影响更新的
for key in key_pattern.keys():
filter_query.update({key: update_query.pop(key, None)})
if not update_query:
return None, None
# 更新单条数据根据是否变动作为条件判断是否有变化如果有变动收集third_id统一添加修改update_time
filter_query, update_query = update_document(filter_query, update_query, replace=False)
return filter_query, update_query
def _get_item_type(self, item) -> str:
"""获取Item类型"""
return item.__class__.__tablename__

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
# @Time : 2025/12/2 13:34
# @Author : zhaoxiangpeng
# @File : verify_data.py
import logging
from itemadapter import ItemAdapter
from pymongo import MongoClient
from science_article_add.items import ArticleItem
class VerifyDataIntegrity:
def __init__(self, mongo_uri, mongo_db):
self.successful_delete = False
self.batch_ids = set()
self.successful = []
self.logger = logging.getLogger(__name__)
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.client: MongoClient = None
self.db = None
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
c = cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
)
return c
def init_db(self):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def open_spider(self, spider):
spider_batch_ids = spider.get_batch_ids()
for batch in spider_batch_ids:
if batch.get("field", "UT") == "UT":
self.batch_ids.add(batch.get("third_id"))
self.init_db()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
if isinstance(item, ArticleItem):
unique_id = adapter.get("third_id")
self.successful.append(unique_id)
if self.successful_delete:
self.batch_ids.discard(unique_id)
return item
def close_spider(self, spider):
failure = self.batch_ids - set(self.successful)
coll = self.db.get_collection("todo_ids_wos")
if self.successful:
if self.successful_delete:
coll.delete_many(filter={"third_id": {"$in": self.successful}})
self.logger.info("Successfully deleted %d articles", len(self.successful))
else:
coll.update_many(filter={"third_id": {"$in": self.successful}}, update={"$set": {"state": 1}})
self.logger.info("Successfully updated %d articles", len(self.successful))
if failure:
self.logger.warning("未下载到: %s" % list(failure))
coll.update_many(filter={"third_id": {"$in": list(failure)}}, update={"$set": {"state": -1}})
else:
self.logger.info("Successfully verified: %s" % "下载完整无异常")

@ -0,0 +1,40 @@
# pipelines.py
import pymongo
from itemadapter import ItemAdapter
from science_article_add.items.wos import WosCitedNumberItem, WosIdRelationItem
class MongoDBPipeline:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy_data')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 根据Item类型存储到不同的集合
if isinstance(item, WosIdRelationItem):
collection_name = 'relation_school_wos'
elif isinstance(item, WosCitedNumberItem):
collection_name = 'relation_cited_number_wos'
else:
collection_name = 'data_other'
# 插入数据
self.db[collection_name].insert_one(dict(adapter))
return item

@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
# @Time : 2024/3/5 16:05
# @Author : zhaoxiangpeng
# @File : parse_data.py
import logging
from typing import Union
from science_article_add.utils.tools import str2int
logger = logging.getLogger(__name__)
DEFAULT_TABLE_HEAD = ['PT', 'AU', 'BA', 'BE', 'GP', 'AF', 'BF', 'CA', 'TI', 'SO', 'SE', 'BS', 'LA', 'DT', 'CT', 'CY', 'CL', 'SP', 'HO', 'DE', 'ID', 'AB', 'C1', 'C3', 'RP', 'EM', 'RI', 'OI', 'FU', 'FP', 'FX', 'CR', 'NR', 'TC', 'Z9', 'U1', 'U2', 'PU', 'PI', 'PA', 'SN', 'EI', 'BN', 'J9', 'JI', 'PD', 'PY', 'VL', 'IS', 'PN', 'SU', 'SI', 'MA', 'BP', 'EP', 'AR', 'DI', 'DL', 'D2', 'EA', 'PG', 'WC', 'WE', 'SC', 'GA', 'PM', 'OA', 'HC', 'HP', 'DA', 'UT']
DEFAULT_TABLE_HEAD_LOWER = [h.lower() for h in DEFAULT_TABLE_HEAD]
def to_dict(data, headers: list):
data_text = data.strip().decode()
_to_dict = {}
for key, value in zip(headers, data_text.split('\t')):
if not value:
value = None
_to_dict[key] = value
vyear = None
str2int(_to_dict.get("py"), None)
try:
vyear = str2int(_to_dict.get("py"), None)
if not vyear:
logger.warning("WOS号: %s,年份异常: %s" % (_to_dict["ut"], _to_dict.get("py")))
except Exception as e:
logger.exception("""
原始数据: %s,
数据字典: %s
异常信息: %s""" % (data, _to_dict, e))
_to_dict["py"] = vyear
return _to_dict
def parse_full_records_txt(content: bytes):
lines = content.strip().split(b'\r\n')
head_line = lines.pop(0)
try:
head_start = head_line.index(b'PT')
head_line = head_line[head_start:]
head_line = head_line.strip().decode('utf-8')
HEADERS = head_line.split('\t')
HEADERS = [s.lower() for s in HEADERS]
except ValueError:
logger.error("内容出现异常跳过: %s" % head_line)
HEADERS = ['PT', 'AU', 'Z2', 'AF', 'BA', 'BF', 'CA', 'GP', 'BE', 'TI', 'Z1', 'SO', 'Z3', 'SE', 'BS', 'LA', 'DT', 'CT', 'CY', 'CL', 'SP', 'HO', 'DE', 'Z5', 'ID', 'AB', 'Z4', 'C1', 'Z6', 'RP', 'EM', 'Z7', 'RI', 'OI', 'FU', 'FX', 'CR', 'NR', 'TC', 'Z9', 'Z8', 'Z9', 'U1', 'U2', 'PU', 'PI', 'PA', 'SN', 'EI', 'BN', 'J9', 'JI', 'PD', 'PY', 'VL', 'IS', 'SI', 'PN', 'SU', 'MA', 'BP', 'EP', 'AR', 'DI', 'D2', 'EA', 'EY', 'PG', 'P2', 'WC', 'SC', 'PM', 'UT', 'OA', 'HP', 'HC', 'DA', 'C3']
HEADERS = [s.lower() for s in HEADERS]
while lines:
line_data = lines.pop(0)
# print(line_data)
standard_data = to_dict(line_data, HEADERS)
# third_id = standard_data.pop('ut', None)
# if not third_id:
# continue
yield standard_data
def parse_full_records(body: Union[bytes, str]):
"""
解析响应的下载内容
"""
if isinstance(body, str):
body = body.encode()
item_g = parse_full_records_txt(body)
for data_dic in item_g:
yield data_dic

@ -14,9 +14,8 @@ NEWSPIDER_MODULE = "science_article_add.spiders"
ADDONS = {} ADDONS = {}
# Crawl responsibly by identifying yourself (and your website) on the user-agent # Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = "science_article_add (+http://www.yourdomain.com)" USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'
# Obey robots.txt rules # Obey robots.txt rules
ROBOTSTXT_OBEY = False ROBOTSTXT_OBEY = False
@ -48,26 +47,42 @@ DOWNLOAD_DELAY = 1
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html # See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
RETRY_ENABLED = True RETRY_ENABLED = True
RETRY_TIMES = 2 # 重试3次 RETRY_TIMES = 2 # 重试3次
# RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 400, 403, 404] # 增加了一些常见的错误码 RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 400, 403, 404] # 增加了一些常见的错误码
DOWNLOADER_MIDDLEWARES = {
'scrapy.downloadermiddlewares.retry.RetryMiddleware': 550
# "org_news.middlewares.OrgNewsDownloaderMiddleware": 543,
}
#DOWNLOADER_MIDDLEWARES = { #DOWNLOADER_MIDDLEWARES = {
# "science_article_add.middlewares.ScienceArticleAddDownloaderMiddleware": 543, # "science_article_add.middlewares.ScienceArticleAddDownloaderMiddleware": 543,
#} #}
# Enable or disable extensions # Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html # See https://docs.scrapy.org/en/latest/topics/extensions.html
#EXTENSIONS = { EXTENSIONS = {
# "scrapy.extensions.telnet.TelnetConsole": None, # "scrapy.extensions.telnet.TelnetConsole": None,
#} # "science_article_add.extensions.ackextension.ACKExtension": 0,
# "science_article_add.extensions.dingtalk_extension.DingTalkExtension": 0,
}
# Configure item pipelines # Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html # See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
#ITEM_PIPELINES = { #ITEM_PIPELINES = {
# "science_article_add.pipelines.ScienceArticleAddPipeline": 300, # "science_article_add.pipelines.ScienceArticleAddPipeline": 300,
#} #}
MONGO_URI = "mongodb://root:123456@192.168.1.211:27017/" # MONGO_URI = "mongodb://root:123456@192.168.1.211:27017/"
MONGO_DATABASE = "science2" # MONGO_DATABASE = "science2"
MONGO_URI = "mongodb://science-dev:kcidea1509!%25)(@101.43.239.105:27017/?authSource=science&directConnection=true"
MONGO_DATABASE = 'science2'
REDIS_URL = 'redis://:kcidea1509@192.168.1.211:6379/10' # REDIS_URL = 'redis://:kcidea1509@192.168.1.211:6379/10'
REDIS_URL = 'redis://:kcidea1509!%)(@43.140.203.187:6379/10'
# mysql配置
MYSQL_HOST = '43.140.203.187'
MYSQL_PORT = 3306
MYSQL_DATABASE = 'science_data_dept'
MYSQL_USER = 'science-data-dept'
MYSQL_PASSWORD = 'datadept1509'
# Enable and configure the AutoThrottle extension (disabled by default) # Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html # See https://docs.scrapy.org/en/latest/topics/autothrottle.html
@ -92,3 +107,26 @@ REDIS_URL = 'redis://:kcidea1509@192.168.1.211:6379/10'
# Set settings whose default value is deprecated to a future-proof value # Set settings whose default value is deprecated to a future-proof value
FEED_EXPORT_ENCODING = "utf-8" FEED_EXPORT_ENCODING = "utf-8"
# 钉钉机器人配置
DINGTALK_WEBHOOK_URL = 'https://oapi.dingtalk.com/robot/send?access_token=1252fe1ef63e95ced11ac87a01e9978670e82036a516c558e524f89e11513f9f'
DINGTALK_SECRET = 'SECe77fe7cd6c0dbfcdd9ebe6ba1941ddc376be86ca717e9d68bb177b7eded71091'
# 自定义消息模板(可选)
DINGTALK_START_MESSAGE = "🚀 爬虫启动啦!\n**爬虫**: %(spider_name)s\n**时间**: %(started_time)s"
# DINGTALK_CLOSED_MESSAGE = "✅ 爬虫完成!\n**爬虫**: %(spider_name)s\n**项目数**: %(item_scraped_count)s"
# 启用/禁用特定通知
DINGTALK_ENABLE_START = False
DINGTALK_ENABLE_FINISH = True
DINGTALK_ENABLE_ERROR = True
DINGTALK_CLOSED_MESSAGE = """📊 爬虫完成通知\n
**爬虫名称**: %(spider_name)s\n
**机构名称**: %(org_name)s\n
**任务条件**: %(task_condition)s\n
**任务ID**: %(record_id)s\n
**完成时间**: %(finished_time)s\n
**完成原因**: %(finish_reason)s\n
**采集统计**:\n
- 采集项目: %(item_scraped_count)s
- 请求响应: %(response_count)s
- 错误数量: %(error_count)s \n
**状态**: %(state)s"""

@ -1,9 +1,12 @@
import math import math
from datetime import datetime
from urllib.parse import urlencode from urllib.parse import urlencode
from copy import deepcopy from copy import deepcopy
import scrapy import scrapy
from scrapy.http.response.json import JsonResponse
from science_article_add.items import WosLiteAddItem from science_article_add.items.wos import WosCitedNumberItem, WosIdRelationItem
from science_article_add.models import wos_model as model from science_article_add.models import wos_model as model
from science_article_add.configs import wos as config from science_article_add.configs import wos as config
from science_article_add.utils import tools from science_article_add.utils import tools
@ -19,120 +22,102 @@ class WosLatestIncrementSpider(scrapy.Spider):
# start_urls = ["https://wos-api.clarivate.com/api/woslite"] # start_urls = ["https://wos-api.clarivate.com/api/woslite"]
custom_settings = dict( custom_settings = dict(
DOWNLOADER_MIDDLEWARES={ DOWNLOADER_MIDDLEWARES={
"science_article_add.middlewares.WosLiteApiXkeyDownloaderMiddleware": 500 "science_article_add.middlewares.wos.WosStarterApiXkeyDownloaderMiddleware": 500
}, },
ITEM_PIPELINES={ ITEM_PIPELINES={
"science_article_add.pipelines.ScienceAddBufferPipeline": 300, "science_article_add.pipelines.mongo.MongoPipelineMulti": 300,
} "science_article_add.pipelines.duptodo.DupTodoPipeline": 400,
},
LOG_LEVEL="INFO"
) )
source = "wos"
def __init__(self, task_obj): def __init__(self, task_obj):
scrapy.Spider.__init__(self) scrapy.Spider.__init__(self)
self.task_obj = task_obj self.task_obj = task_obj
self.record_id = task_obj['id'] self.record_id = task_obj['task_id']
self.org_id = task_obj['org_id'] self.org_id = self.tolist(task_obj['org_id'])
self.org_name = task_obj['org_name'] self.org_name = self.tolist(task_obj['org_name'])
self.query_id = task_obj['query_id'] self.query_id = task_obj['query_id']
self.query_content = task_obj['content'] self.query_content = task_obj['content']
self.query_condition = task_obj['task_condition'] self.query_condition = task_obj['task_condition']
self.first_page = task_obj.get('first_page', 1)
self._records_found = 0
@staticmethod
def tolist(datas) -> list:
if isinstance(datas, (list, tuple, set)):
return list(set(datas))
else:
raise TypeError("不支持的类型:%s" % (type(datas)))
async def start(self): async def start(self):
full_query = self.query_content full_query = self.query_content
if self.query_condition is not None: if self.query_condition is not None:
full_query = '%(query)s %(condition)s' % {'query': self.query_content, 'condition': self.query_condition} full_query = '%(query)s%(condition)s' % {
yield scrapy.Request(url=config.WOS_LITE_QUERY_FIRST_API + '?' + urlencode(model.lite_base_model(usr_query=full_query)), 'query': f'({self.query_content})' if self.query_condition else self.query_content,
dont_filter=True, 'condition': ' ' + self.query_condition if self.query_condition else ''
meta={'query': full_query, 'PAGE': 1}) }
self.logger.info(f'full_query: {full_query}')
async def parse(self, response, **kwargs): meta = dict(q=full_query, page=self.first_page, limit=50, detail="short")
params = model.starter_documents_get(**meta)
enc_params = urlencode(params, doseq=True)
yield scrapy.Request(url=config.WOS_STARTER_DOCUMENT_API + '?' + enc_params,
meta=meta)
async def parse(self, response: JsonResponse, **kwargs):
meta = response.meta meta = response.meta
request = response.request request: scrapy.Request = response.request
task_query_id = self.query_id task_query_id: int = self.query_id
task_org_id = self.org_id task_org_id: list = self.org_id
task_record_id = self.record_id task_record_id: int = self.record_id
self.logger.debug('%s: %s' % ('parse_query_api', meta))
if response.status != 200:
self.logger.warning("""
响应异常
状态码: %s
响应内容: %s""" % (response.status, response.text))
req_meta = request.meta
resp_result = response.json() resp_result = response.json()
metadata: dict = resp_result.get("metadata")
current_page = metadata.get("page")
records_found = metadata.get('total')
max_page = req_meta.get("MAX_PAGE")
if req_meta.get("page") == self.first_page:
self.logger.info("""
检索式: %s
检索到结果: %s""" % (req_meta.get("q"), records_found))
self.set_records_found(records_found)
max_page = req_meta["MAX_PAGE"] = math.ceil(records_found / config.WOS_STARTER_PER_PAGE_LIMIT)
batch_time = datetime.now()
hits: list = resp_result.get("hits")
for record in hits:
cited_num = tools.get_list_key(array=record.get("citations"), target="count", condition=("db", "WOS"))
if cited_num:
cited_item = WosCitedNumberItem()
cited_item['third_id'] = record.get("uid")
cited_item['cited'] = cited_num
cited_item['updated_at'] = batch_time
yield cited_item
relation_item = WosIdRelationItem()
relation_item['third_id'] = record.get("uid")
relation_item['query_ids'] = [task_query_id]
relation_item['school_ids'] = task_org_id
relation_item['task_ids'] = [task_record_id]
relation_item['updated_at'] = batch_time
yield relation_item
if current_page < max_page:
meta_copy: dict = deepcopy(req_meta)
meta_copy.update({'page': meta_copy['page'] + 1})
yield scrapy.Request(
config.WOS_STARTER_DOCUMENT_API + '?' + urlencode(model.starter_documents_get(**meta_copy)),
meta=meta_copy)
query_result = resp_result.get('QueryResult') def set_records_found(self, val):
datas = resp_result.get('Data') self._records_found = val
query_id = query_result.get('QueryID')
records_found = query_result.get('RecordsFound')
max_page = math.ceil(records_found / 100)
meta_copy: dict = deepcopy(meta)
meta_copy.update({'MAX_PAGE': max_page})
meta_copy.update({'TOTAL': records_found})
meta_copy.update({'QUERY_ID': query_id})
meta_copy.update({'next_page': meta['PAGE'] + 1})
meta_copy.update({'PAGE': meta['PAGE'] + 1})
meta_copy.update({'first_record': calculate_next_page(meta_copy['next_page'])})
for data in datas:
add_item = WosLiteAddItem()
# 入库年份优先按照自己指定的
to_db_year = meta.get("search_year")
if not to_db_year:
publish_year = data.get("Source", {}).get("Published.BiblioYear", [])
if publish_year:
to_db_year = tools.str2int(publish_year[0])
add_item["third_id"] = data.get('UT')
add_item["year"] = to_db_year
add_item["query_ids"] = [task_query_id]
add_item["school_ids"] = [task_org_id]
add_item["task_ids"] = [task_record_id]
yield add_item
yield scrapy.Request(
url=config.WOS_LITE_QUERY_API + f'/{query_id}',
body=model.lite_query_model(**meta_copy),
meta=meta_copy,
callback=self.parse_query_api,
)
async def parse_query_api(self, response, **kwargs):
meta = response.meta
task_query_id = self.query_id
task_org_id = self.org_id
task_record_id = self.record_id
self.logger.debug("""
%s
%s""" % ('parse_query_api', meta))
resp_result = response.json() def get_records_found(self) -> int:
query_id = meta.get('QUERY_ID') return self._records_found
datas = resp_result.get('Data', [])
if len(datas):
for data in datas:
add_item = WosLiteAddItem()
# 入库年份优先按照自己指定的
to_db_year = meta.get("search_year")
if not to_db_year:
publish_year = data.get("Source", {}).get("Published.BiblioYear", [])
if publish_year:
to_db_year = tools.str2int(publish_year[0])
add_item["third_id"] = data.get('UT')
add_item["year"] = to_db_year
add_item["query_ids"] = [task_query_id]
add_item["school_ids"] = [task_org_id]
add_item["task_ids"] = [task_record_id]
yield add_item
else:
# 根据条件记录生成sql记录结果
update_state_condition = ('batch_date = "%(batch_date)s"\n'
'query_id = %(query_id)s\n'
'task_condition = "%(task_condition)s"') % self.task_obj
self.logger.warning('没有数据了\n%s' % update_state_condition)
return
if meta['first_record'] < meta['TOTAL']:
meta_copy = deepcopy(meta)
meta_copy.update({'next_page': meta['PAGE'] + 1})
meta_copy.update({'PAGE': meta['PAGE'] + 1})
meta_copy.update({'first_record': calculate_next_page(meta_copy['next_page'])})
yield scrapy.Request(
url=config.WOS_LITE_QUERY_API + f'/{query_id}',
body=model.lite_query_model(**meta_copy),
meta=meta_copy,
callback=self.parse_query_api,
)

@ -1,3 +1,7 @@
from typing import List, Tuple
from datetime import datetime
def str2int(val, replace=0): def str2int(val, replace=0):
try: try:
val = int(val) val = int(val)
@ -5,4 +9,24 @@ def str2int(val, replace=0):
val = replace val = replace
except TypeError: except TypeError:
val = replace val = replace
return val return val
def get_today_date(fmt: str = "%Y-%m-%d"):
return datetime.today().strftime(fmt)
def get_list_key(array: List[dict], target: str, condition: Tuple[str, str]):
"""
给定一个list [{key: val1, target: val2}, {key: val1, target: val2}]
根据condition(key=val)返回第一个target对应的值
:param target:
:param condition:
:param array:
:return:
"""
n, v = condition
for dic in array:
if dic.get(n) == v:
return dic.get(target)

@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
# @Time : 2024/5/9 11:11
# @Author : zhaoxiangpeng
# @File : config.py
# 数据来源名
SOURCE_NAME = 'cnki'
# 期刊导航
# 主页
CNKI_JOURNAL_NAVIGATOR_INDEX = 'https://navi.cnki.net/knavi/journals/index?uniplatform=NZKPT'
# 搜索接口
CNKI_JOURNAL_NAVIGATOR_SEARCH_API = 'https://navi.cnki.net/knavi/journals/searchbaseinfo'
# 导出xls接口旧版, 通过filename导出
CNKI_EXPORT_XLS_OLD_API = 'https://kns.cnki.net/dm/manage/FileToText'
# 导出xls接口
CNKI_EXPORT_XLS_API = 'https://kns.cnki.net/dm8/FileToText'
# 期刊详情页
CNKI_JOURNAL_DETAIL = 'https://navi.cnki.net/knavi/journals/{journal_no}/detail?uniplatform=NZKPT'
# 期刊详情页获取发文年份/期列表的接口
CNKI_JOURNAL_ISSUE = 'https://navi.cnki.net/knavi/journals/{journal_no}/yearList' # ZDJY
# 期刊详情页获取年/期发文列表的接口
CNKI_JOURNAL_ISSUE_ARTICLE = 'https://navi.cnki.net/knavi/journals/{journal_no}/papers'
# 文章详情页
CNKI_ARTICLE_DETAIL = 'https://kns.cnki.net/kcms/detail/detail.aspx?dbcode={db_code}&filename={article_id}'
# -- 旧版的接口
CNKI_ADV_SEARCH_API = 'https://kns.cnki.net/kns8s/brief/grid'
# 搜索用的请求头
SEARCH_HEADERS = {
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Cookie': 'Ecp_notFirstLogin=qkFgu9; Ecp_ClientId=o240823084800102418; Ecp_loginuserbk=SJTU; cnkiUserKey=eef4d3aa-1096-bc9e-dff0-74349179c2cc; Ecp_ClientIp=111.186.52.67; UM_distinctid=19366f14e7a832-0f92ef85a35cb5-26001051-1fa400-19366f14e7c14f2; Hm_lvt_dcec09ba2227fd02c55623c1bb82776a=1734079899; Ecp_session=1; SID_kns_new=kns018104; SID_sug=018104; knsLeftGroupSelectItem=; updatetime-advInput=2024-12-19+17%3A42%3A08; knsadv-searchtype=%7B%22BLZOG7CK%22%3A%22gradeSearch%2CmajorSearch%22%2C%22MPMFIG1A%22%3A%22gradeSearch%2CmajorSearch%2CsentenceSearch%22%2C%22T2VC03OH%22%3A%22gradeSearch%2CmajorSearch%22%2C%22JQIRZIYA%22%3A%22gradeSearch%2CmajorSearch%2CsentenceSearch%22%2C%22S81HNSV3%22%3A%22gradeSearch%22%2C%22YSTT4HG0%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22ML4DRIDX%22%3A%22gradeSearch%2CmajorSearch%22%2C%22WQ0UVIAA%22%3A%22gradeSearch%2CmajorSearch%22%2C%22VUDIXAIY%22%3A%22gradeSearch%2CmajorSearch%22%2C%22NN3FJMUV%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22LSTPFY1C%22%3A%22gradeSearch%2CmajorSearch%2CsentenceSearch%22%2C%22HHCPM1F8%22%3A%22gradeSearch%2CmajorSearch%22%2C%22OORPU5FE%22%3A%22gradeSearch%2CmajorSearch%22%2C%22WD0FTY92%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22BPBAFJ5S%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22EMRPGLPA%22%3A%22gradeSearch%2CmajorSearch%22%2C%22PWFIRAGL%22%3A%22gradeSearch%2CmajorSearch%2CsentenceSearch%22%2C%22U8J8LYLV%22%3A%22gradeSearch%2CmajorSearch%22%2C%22R79MZMCB%22%3A%22gradeSearch%22%2C%22J708GVCE%22%3A%22gradeSearch%2CmajorSearch%22%2C%22HR1YT1Z9%22%3A%22gradeSearch%2CmajorSearch%22%2C%22JUP3MUPD%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22NLBO1Z6R%22%3A%22gradeSearch%2CmajorSearch%22%2C%22RMJLXHZ3%22%3A%22gradeSearch%2CmajorSearch%2CsentenceSearch%22%2C%221UR4K4HZ%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22NB3BWEHK%22%3A%22gradeSearch%2CmajorSearch%22%2C%22XVLO76FD%22%3A%22gradeSearch%2CmajorSearch%22%7D; createtime-advInput=2024-12-20%2014%3A37%3A03; LID=WEEvREcwSlJHSldSdmVpanJGNW9JQS9sbkNrOUFycHJkRzF3eXgyTGlWbz0=$9A4hF_YAuvQ5obgVAqNKPCYcEjKensW4IQMovwHtwkF4VYPoHbKxJw!!; Ecp_LoginStuts={"IsAutoLogin":false,"UserName":"SJTU","ShowName":"%E4%B8%8A%E6%B5%B7%E4%BA%A4%E9%80%9A%E5%A4%A7%E5%AD%A6","UserType":"bk","BUserName":"","BShowName":"","BUserType":"","r":"qkFgu9","Members":[]}; KNS2COOKIE=1734680479.883.14106.830885|b25e41a932fd162af3b8c5cff4059fc3; dblang=both; c_m_LinID=LinID=WEEvREcwSlJHSldSdmVpanJGNW9JQS9sbkNrOUFycHJkRzF3eXgyTGlWbz0=$9A4hF_YAuvQ5obgVAqNKPCYcEjKensW4IQMovwHtwkF4VYPoHbKxJw!!&ot=12%2F20%2F2024%2016%3A01%3A27; c_m_expire=2024-12-20%2016%3A01%3A27; tfstk=gnXZLQYMKRewdgBaoHvqL9aIUYp9sd45ntTXmijDfFYG5iTcTZbBCGsccx-D-NdjCxY18pQRVAC_6ITq0dBC1xT_WKScPKz7P8w5XGpynzaShW0gBdKqnncilpDHmK-i1ZwdGGpvnyaM9UCdXabz7TCMnkJH4ncDnxYMtk-6qKDMiAcn-eKDnKADjDYH4nmioAYgYMYpDKxcoCcmtGjmL3Og25LCsWPKUCYljekmU0KHslSnGAMsnhA9rBxrnH6ebC8ljOHkrv-hd9RWOmayKgCCSHJz3vvwaOBytO4K3BQ2-IWMh0kcYNshNIWgD5IF3FRlIBoS3dIpmZAV9zkWbd1eaO5TD2jGPF5kBiiz5MRPTQKHtmlMC_s5HQXgQ4LBwn7y4NuN4DuvxG5lH1umgCxpYUZUY7E40mtBH0LEMjdHeH87fhGxMCxpYUZUYjhvteKePlt1.; searchTimeFlags=1; updatetime-advInput=2024-12-19+17%3A42%3A08',
'Origin': 'https://kns.cnki.net',
'Referer': 'https://kns.cnki.net/kns8s/AdvSearch?crossids=YSTT4HG0%2CLSTPFY1C%2CJUP3MUPD%2CMPMFIG1A%2CWQ0UVIAA%2CBLZOG7CK%2CPWFIRAGL%2CEMRPGLPA%2CNLBO1Z6R%2CNN3FJMUV',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36',
}
# 期刊导航页的请求头
JOURNAL_NAVIGATOR_HEADERS = {
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': 'https://navi.cnki.net',
'Referer': 'https://navi.cnki.net/knavi/journals/index?uniplatform=NZKPT',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
'uniplatform': 'NZKPT',
}
# mongodb 集合配置
# 风控参数缓存key
FEC_REDIS_KEY = "cookies_pool:cnki:crypt"
FEC_REDIS_TTL = 3600
# 详情页cookies
COOKIES_REDIS_KEY = "cookies_pool:cnki:detail_cookies"
# 期刊信息表
CNKI_JOURNAL_INFO_COLLECTION = 'task_journal_info_{}'.format(SOURCE_NAME)
# 期刊年卷期任务表
CNKI_JOURNAL_ISSUE_COLLECTION = 'task_journal_issue_{}'.format(SOURCE_NAME)
# 期刊发文表
CNKI_JOURNAL_ARTICLE_COLLECTION = 'data_{}_article'.format(SOURCE_NAME)
# 待下载Id表
CNKI_ARTICLE_TODO_IDS_COLLECTION = 'todo_ids_{}'.format(SOURCE_NAME)
# 待下载详情id表
CNKI_ARTICLE_DETAIL_TODO_IDS_COLLECTION = 'todo_ids_cnki_detail'
# 发文作者地址关系
CNKI_ARTICLE_AUTHOR_ORG_COLLECTION = "relation_author_org_cnki"
# 发文关系表
SCHOOL_RELATION_COLLECTION = 'relation_school_{}'.format(SOURCE_NAME)
# 中文期刊列表需要用到的集合
CHECK_JOURNAL_INFO_TABLE = "check_journal_info_{}".format(SOURCE_NAME) # 信息表
CHECK_JOURNAL_MIDDLE_TABLE = "check_journal_middle_{}".format(SOURCE_NAME) # 中间任务表
CHECK_JOURNAL_ISDOWN_TABLE = "check_journal_isdown_{}".format(SOURCE_NAME) # 结果存储表
# xls文件表头
TABLE_HEAD = ['SrcDatabase-来源库', 'Title-题名', 'Author-作者', 'Organ-单位', 'Source-文献来源', 'Keyword-关键词', 'Summary-摘要', 'PubTime-发表时间', 'FirstDuty-第一责任人', 'Fund-基金', 'Year-年', 'Volume-卷', 'Period-期', 'PageCount-页码', 'CLC-中图分类号', 'ISSN-国际标准刊号', 'URL-网址', 'DOI-DOI']
# json字段表头
TABLE_HEAD_EN = ['src_db', 'title', 'author', 'org', 'journal', 'keyword', 'abstract', 'pub_time', 'first_duty', 'fund', 'year', 'volum', 'issue', 'page', 'classification_code', 'issn', 'url', 'doi']
# 每次下载数量
BATCH_DOWNLOAD_LIMIT = 50

@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
# @Time : 2024/5/13 16:53
# @Author : zhaoxiangpeng
# @File : extract_rule.py
# 提取ISSN号
ISSN_REGEX_PATTERN = r'ISSN(\d{4}-[\dX]{4})'
# 提取CN号, https://baike.baidu.com/item/%E5%9B%BD%E5%86%85%E7%BB%9F%E4%B8%80%E5%88%8A%E5%8F%B7/386463
CN_REGEX_PATTERN = r'CN(\d{2}-\d{4}/?[A-Z]?)'
# 去除/替换标题中的特殊字符
DEL_TITLE_SYMBOL_PATTERN = '[!"#$%&\'()*+,-.·/:;<=>—?@,。?★、…()【】《》?“”‘’![\\]^_`{|}~\s]+'
# 去除特殊字符后的字符
DEL_SOURCE_SYMBOL_PATTERN = DEL_TITLE_SYMBOL_PATTERN

@ -0,0 +1,87 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Optional, Dict, Tuple
from pymongo import MongoClient
from pymongo import UpdateOne
from pymongo.errors import DuplicateKeyError, BulkWriteError
if TYPE_CHECKING:
from pymongo.database import Database
from pymongo.collection import Collection
from pymongo.results import InsertManyResult, BulkWriteResult
def build_update_query(update_data: dict, replace: bool = True) -> dict:
"""
如果replace为True则直接覆盖原有的document
"""
update_query = {}
if not update_data:
return {}
for key, val in update_data.items():
if replace:
update_query.setdefault(
"$set", {}
).update(
{key: val}
)
else:
if isinstance(val, list):
update_query.setdefault(
"$addToSet", {}
).update({
key: {"$each": val}
})
else:
update_query.setdefault(
"$set", {}
).update(
{key: val}
)
return update_query
def update_document(filter_query: dict = None, update_data: dict = None, replace: bool = True) -> Tuple[dict, dict]:
update_query = {}
if not update_data:
return {}, {}
for key, val in update_data.items():
if replace:
update_query.setdefault(
"$set", {}
).update(
{key: val}
)
else:
if isinstance(val, list):
update_query.setdefault(
"$addToSet", {}
).update({
key: {"$each": val}
})
else:
update_query.setdefault(
"$set", {}
).update(
{key: val}
)
return filter_query, update_query
class MongoDBUtils:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.client: MongoClient = None
self.db: Database = None
def _insert2db(self, items, tablename, ordered: bool = False, **kwargs) -> InsertManyResult:
collection: Collection = self.db.get_collection(tablename)
result: InsertManyResult = collection.insert_many(items, ordered=ordered, **kwargs)
return result
def _update2db(self, items, tablename, ordered: bool = False, **kwargs) -> BulkWriteResult:
collection: Collection = self.db.get_collection(tablename)
bulk_results: BulkWriteResult = collection.bulk_write(items, ordered=ordered, **kwargs)
return bulk_results

@ -0,0 +1,40 @@
# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html
import scrapy
class ScienceArticlCnkiItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
pass
class AddItemBase(scrapy.Item):
third_id = scrapy.Field()
updated_at = scrapy.Field()
class ArticleItem(AddItemBase):
exported = scrapy.Field()
class IdRelationItem(AddItemBase):
query_ids = scrapy.Field()
school_ids = scrapy.Field()
task_ids = scrapy.Field()
class ArticleCitedItem(AddItemBase):
cited = scrapy.Field()
class CnkiCitedNumberItem(ArticleCitedItem):
__tablename__ = 'relation_cited_number_cnki'
"""发文被引量item"""
third_id = scrapy.Field()
cited = scrapy.Field()
updated_at = scrapy.Field()

@ -9,7 +9,7 @@ from scrapy import signals
from itemadapter import ItemAdapter from itemadapter import ItemAdapter
class ScienceArticleAddSpiderMiddleware: class ScienceArticlCnkiSpiderMiddleware:
# Not all methods need to be defined. If a method is not defined, # Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the spider middleware does not modify the # scrapy acts as if the spider middleware does not modify the
# passed objects. # passed objects.
@ -53,7 +53,7 @@ class ScienceArticleAddSpiderMiddleware:
spider.logger.info("Spider opened: %s" % spider.name) spider.logger.info("Spider opened: %s" % spider.name)
class ScienceArticleAddDownloaderMiddleware: class ScienceArticlCnkiDownloaderMiddleware:
# Not all methods need to be defined. If a method is not defined, # Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the downloader middleware does not modify the # scrapy acts as if the downloader middleware does not modify the
# passed objects. # passed objects.
@ -100,14 +100,17 @@ class ScienceArticleAddDownloaderMiddleware:
spider.logger.info("Spider opened: %s" % spider.name) spider.logger.info("Spider opened: %s" % spider.name)
class WosLiteApiXkeyDownloaderMiddleware: from scrapy.http.headers import Headers
async def process_request(self, request, spider):
key_param = {
'X-ApiKey': '941a216f25cbef0f80ee4ba58a08ef1e19dee7a4'
}
if not request.headers:
request.headers = key_param
return request
request.headers.update(key_param)
return request class CnkiSearchHeadersDownloaderMiddleware:
def __init__(self, custom_headers: dict):
self.custom_headers = custom_headers
@classmethod
def from_crawler(cls, crawler):
return cls(custom_headers=crawler.settings['SEARCH_REQUEST_HEADERS'])
def process_request(self, request, spider):
request.headers = Headers(self.custom_headers)
return None

@ -0,0 +1,630 @@
# -*- coding: utf-8 -*-
# @Time : 2023/2/9 14:24
# @Author : zhaoxiangpeng
# @File : model.py
import re
import json
from typing import Union, List, Dict
from datetime import datetime, timedelta
from science_article_cnki.models.enum_cls import ResourceType, SearchTypeId, SearchFieldEnum, OperatorEnum, LogicEnum, SearchFromId
DB_CODE = {
'CFLS': '总库',
'CFLQ': '期刊',
'CDMD': '学位论文',
'CFLP': '会议',
'CCND': '报纸'
}
def export2adv(query):
"""
检索式的专业检索转高级检索
:param query:
:return:
"""
if query.find('%=') != -1:
query = '(作者单位:%(name)s(模糊)' % {"name": query.split('%=')[-1]}
return query
def navigator_body(query: str = None, db_code: str = 'CFLS', **kwargs):
if query is None:
raise ValueError("query 不能为空应为检索式AF=上海交通大学,具体检索式见 "
"https://piccache.cnki.net/2022/kdn/index/helper/manual.html#frame2-1-5")
_param = {
'queryJson': json.dumps({
"Platform": "",
"DBCode": db_code,
"KuaKuCode": "CJFQ,CDMD,CIPD,CCND,CISD,SNAD,BDZK,CCJD,CCVD,CJFN",
"QNode": {
"QGroup": [
{"Key": "Subject", "Title": "", "Logic": 4, "Items": [
{"Key": "Expert", "Title": "", "Logic": 0, "Name": "", "Operate": "", "Value": query,
"ExtendType": 12, "ExtendValue": "中英文对照", "Value2": "", "BlurType": ""}],
"ChildItems": []},
{"Key": "ControlGroup", "Title": "", "Logic": 1, "Items": [], "ChildItems": []}
]
},
"CodeLang": ""
})
}
return _param
def signal_body(query: str = None, resource_type: str = 'JOURNAL', group_id: str = 'YE', **kwargs):
"""
获取左侧导航栏单类目的聚合
:group_id: 主要主题1; 学科2; 发表年度3; 研究层次4; 文献类型5; 文献来源6; 作者7; 机构8; 基金9
:return:
"""
if query is None:
raise ValueError("query 不能为空应为检索式AF=上海交通大学,具体检索式见 "
"https://piccache.cnki.net/2022/kdn/index/helper/manual.html#frame2-1-5")
_param = {
'queryJson': json.dumps({
"Platform": "",
"Resource": ResourceType[resource_type].name,
"Classid": ResourceType[resource_type].value,
"Products": "",
"QNode": {
"QGroup": [{
"Key": "Subject",
"Title": "",
"Logic": 0,
"Items": [{
"Key": "Expert",
"Title": "",
"Logic": 0,
"Field": "EXPERT",
"Operator": 0,
"Value": query,
"Value2": ""
}],
"ChildItems": []
}, {
"Key": "ControlGroup",
"Title": "",
"Logic": 0,
"Items": [],
"ChildItems": []
}]
},
"ExScope": "1",
"SearchType": SearchTypeId.GROUP.value,
"Rlang": "CHINESE",
"KuaKuCode": ""
}, ensure_ascii=False),
'groupId': group_id
}
return _param
def refine_search(query: str, resource_type: str = 'JOURNAL', year=None, subject=None, code=None, **kwargs):
"""
使用专业检索式检索后再次检索年份
"""
_query = {
"Platform": "",
"Resource": ResourceType[resource_type].name,
"Classid": ResourceType[resource_type].value,
"Products": "",
"QNode": {
"QGroup": [{
"Key": "Subject",
"Title": "",
"Logic": 0,
"Items": [{
"Key": "Expert",
"Title": "",
"Logic": 0,
"Field": "EXPERT",
"Operator": 0,
"Value": query,
"Value2": ""
}],
"ChildItems": []
}, {
"Key": "ControlGroup",
"Title": "",
"Logic": 0,
"Items": [],
"ChildItems": []
}]
},
"ExScope": "1",
"SearchType": SearchTypeId.GROUP.value,
"Rlang": "CHINESE",
"KuaKuCode": "",
"View": "changeDBOnlyFT"
}
_group2 = {
"Key": "MutiGroup",
"Title": "",
"Logic": 0,
"Items": [],
"ChildItems": []
}
if year:
year_param = {
"Key": "YE",
"Title": "",
"Logic": 0,
"Items": [{
"Key": year,
"Title": "%s" % year,
"Logic": 1,
"Field": "YE",
"Operator": "DEFAULT",
"Value": year,
"Value2": "",
"Name": "YE",
"ExtendType": 0
}],
"ChildItems": []
}
_group2['ChildItems'].append(year_param)
if subject:
subject_param = {
'Key': '6',
'Title': '',
'Logic': 1,
'Items': [{
'Key': code + '?',
'Title': subject,
'Logic': 2,
'Name': '专题子栏目代码',
'Operate': '',
'Value': code + '?',
'ExtendType': 14,
'ExtendValue': '',
'Value2': '',
'BlurType': ''
}],
'ChildItems': []
}
_group2['ChildItems'].append(subject_param)
_query['QNode']['QGroup'].append(_group2)
return _query
def query_search(query_body, page: int = 1, handler_id: str = 18, sql: str = None,
sort: str = 'desc', sort_field: str = 'PT', **kwargs):
"""
搜索请求body
:param query_body: 用来搜索的详细query, 与左侧导航body相同
:param page: 请求的页码
:param handler_id: 可能需要携带此参数, 在源码中获取
:param sql: 源码中一般不需要
:param sort: 排序方式, desc/asc
:param sort_field: 排序字段, PT(发表时间)/CF(被引)
:return:
"""
if page == 1:
base_query = query_body.get("QNode", {}).get("QGroup", [{}])[0].get("Items", [{}])[0].get("Value")
aside = '( %s)' % base_query if page == 1 else ''
_query = {
"boolSearch": "true",
"QueryJson": json.dumps(query_body, ensure_ascii=False),
"pageNum": "1",
"pageSize": "50",
"dstyle": "listmode",
"boolSortSearch": "false",
"aside": aside,
"searchFrom": "资源范围:学术期刊; 仅看有全文,中英文扩展; 时间范围:更新时间:不限; 来源类别:全部期刊;",
"CurPage": "1"
}
else:
_query = {
'boolSearch': "false",
'QueryJson': json.dumps(query_body, ensure_ascii=False),
'pageNum': page,
'pageSize': 50,
'sortField': sort_field,
'sortType': sort,
'dstyle': 'listmode',
'boolSortSearch': "false",
# 'sentenceSearch': "false",
# 'productStr': 'YSTT4HG0,LSTPFY1C,RMJLXHZ3,JQIRZIYA,JUP3MUPD,1UR4K4HZ,BPBAFJ5S,R79MZMCB,MPMFIG1A,EMRPGLPA,J708GVCE,ML4DRIDX,WQ0UVIAA,NB3BWEHK,XVLO76FD,HR1YT1Z9,BLZOG7CK,PWFIRAGL,NN3FJMUV,NLBO1Z6R,',
'aside': '',
'searchFrom': '资源范围:学术期刊; 仅看有全文,中英文扩展; 时间范围:更新时间:不限; 来源类别:全部期刊;',
}
return _query
def get_cnki_export_data(ids: str):
"""
表头
'SrcDatabase-来源库,Title-题名,Author-作者,Organ-单位,Source-文献来源,Keyword-关键词,Summary-摘要,PubTime-发表时间,FirstDuty-第一责任人,Fund-基金,Year-年,Volume-卷,Period-期,PageCount-页码,CLC-中图分类号,ISSN-国际标准刊号,URL-网址,DOI-DOI,',
:param ids:
:return:
"""
data = {
'FileName': ids,
'DisplayMode': 'selfDefine',
'OrderParam': 0,
'OrderType': 'desc',
'SelectField': 'DB,TI,AU,AF,LY,KY,AB,PT,FI,FU,YE,JU,QI,PM,CLC,SN,ABSTRACT,DI,',
'PageIndex': 1,
'PageSize': 20,
'language': 'CHS',
'uniplatform': 'NZKPT',
'Type': 'xls',
}
return data
def export_data(ids: str):
"""
https://kns.cnki.net/dm/manage/FileToText
:param ids:
:return:
"""
data = {
'FileName': ids,
'DisplayMode': 'selfDefine',
'OrderParam': 0,
'OrderType': 'desc',
'SelectField': 'SrcDatabase-来源库,Title-题名,Author-作者,Organ-单位,Source-文献来源,Keyword-关键词,Summary-摘要,PubTime-发表时间,FirstDuty-第一责任人,Fund-基金,Year-年,Volume-卷,Period-期,PageCount-页码,CLC-中图分类号,ISSN-国际标准刊号,URL-网址,DOI-DOI,',
'PageIndex': 1,
'PageSize': 20,
'language': 'CHS',
'uniplatform': '',
'Type': 'xls',
}
return data
def journal_nav_all(page: int = 1, page_size: int = 21):
model = dict(
searchStateJson=json.dumps(
{"StateID": "", "Platfrom": "", "QueryTime": "", "Account": "knavi", "ClientToken": "", "Language": "",
"CNode": {"PCode": "OYXNO5VW", "SMode": "", "OperateT": ""},
"QNode": {"SelectT": "", "Select_Fields": "", "S_DBCodes": "", "QGroup": [], "OrderBy": "OTA|DESC",
"GroupBy": "", "Additon": ""}}, ensure_ascii=False),
displaymode=1,
pageindex=page,
pagecount=page_size,
index='JSTMWT6S',
searchType='刊名(曾用刊名)',
clickName='',
switchdata=''
)
return model
def journal_article_by_year_issue(year_issue, page: int = 0, pcode: str = None):
"""
获取期刊每一期的文章
:param year_issue:
:param page:
:param pcode:
:return:
"""
if pcode is None:
pcode = 'CJFD,CCJD'
model = {
"yearIssue": year_issue,
"pageIdx": page,
"pcode": pcode
}
return model
def add_limit_2query_body(limit_query: Union[List[dict], dict], body_key: str, query_body: dict):
"""
把limit添加到检索的queryJson中
:param limit_query:
:param body_key:
:param query_body:
:return:
"""
# 判断组的key是否存在不存在的话添加一个组
if body_key not in {g["Key"] for g in query_body["QNode"]["QGroup"]}:
query_body["QNode"]["QGroup"].append({
"Key": body_key,
"Title": "",
"Logic": LogicEnum.AND.value,
"Items": [],
"ChildItems": []
})
# 遍历所有的组满足条件则把limit添加进去
for group in query_body["QNode"]["QGroup"]:
if group["Key"] == body_key:
if isinstance(limit_query, dict):
group["ChildItems"].append(limit_query)
elif isinstance(limit_query, list):
group["ChildItems"].extend(limit_query)
else:
raise ValueError("不支持的limit类型 \n%s" % limit_query)
break
def parse_retrieval(query: str):
"""
解析aside值拼接queryJson
:param query:
:return:
"""
def func(string: str):
stand = string[1:-1] # 去除左右的中文括号
title, value = stand.split("", maxsplit=1) # 分割 "作者单位:湖南中医药大学(模糊)" -> [作者单位, 湖南中医药大学(模糊)]
return title, value[:-4], value[-3:-1]
cond_list = re.split(r'(AND|NOT|OR)', query)
logic = 'AND'
content = cond_list[0]
yield logic, func(content)
for i in range(1, len(cond_list), 2):
chunk = cond_list[i:i + 2] # 获取两个元素
logic, content = chunk
yield logic, func(content)
def add_search_word(search_content: str, base_query: dict = None):
"""
高级检索添加检索式
:param search_content: 用高级检索复制下来的aside字段
:param base_query:
:return:
"""
words_query = []
g = parse_retrieval(search_content)
i = 1
for logic, sequence in g:
field_name, word, way = sequence
input_select = "input[data-tipid=gradetxt-%(input_no)s]" % {"input_no": i}
logic_operator = LogicEnum[logic].value
q = {
"Key": input_select,
"Title": field_name,
"Logic": logic_operator,
"Items": [{
"Key": input_select,
"Title": field_name,
"Logic": logic_operator,
"Field": SearchFieldEnum(field_name).name,
"Operator": OperatorEnum[way].value,
"Value": word,
"Value2": ""
}],
"ChildItems": []
}
words_query.append(q)
i += 1
# 如果传入了检索式,那自动添加检索词的语句
if base_query:
add_limit_2query_body(words_query, "Subject", base_query)
return words_query
def limit_year_range(year: int, base_query: dict = None):
"""
添加年份筛选
:param year:
:param base_query:
:return:
"""
year = str(year)
ye_query = {
"Key": "YE",
"Title": "",
"Logic": 0,
"Items": [{
"Key": year,
"Title": "%s" % year,
"Logic": 1,
"Field": "YE",
"Operator": "DEFAULT",
"Value": year,
"Value2": "",
"Name": "YE",
"ExtendType": 0
}],
"ChildItems": []
}
if base_query:
add_limit_2query_body(ye_query, "MutiGroup", base_query)
return ye_query
def parse_updatedtime_symbol(symbol: str, today: str = None) -> tuple:
"""
从字符串解析时间范围
:param symbol:
:param today:
:return:
"""
if today and isinstance(today, str):
today = datetime.strptime(today, "%Y-%m-%d")
else:
today = datetime.now()
if symbol == "最近一周":
ago_day = today - timedelta(days=7)
elif symbol == "最近一月":
ago_day = today - timedelta(days=30)
elif symbol == "最近半年":
ago_day = today - timedelta(days=181)
elif symbol == "最近一年":
ago_day = today.replace(year=today.year-1)
elif symbol == "今年迄今":
ago_day = today.replace(month=1, day=1)
else:
ago_day = today
return ago_day.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d")
def limit_updated_time(range_str: str, toady: str = None, base_query: dict = None):
"""
更新时间的检索式
:param range_str:
:param toady:
:param base_query:
:return:
"""
start_date, end_date = parse_updatedtime_symbol(range_str, toady)
rt_query = {
"Key": ".tit-dropdown-box>.sort",
"Title": "",
"Logic": 0,
"Items": [
{
"Key": ".tit-dropdown-box>.sort",
"Title": "更新时间",
"Logic": 0,
"Field": "RT",
"Operator": 7,
"Value": start_date,
"Value2": end_date
}
],
"ChildItems": []
}
# 当base_query参数存在时自动添加筛选日期范围的query
if base_query:
add_limit_2query_body(rt_query, "ControlGroup", base_query)
return rt_query
def temp_refine_search(
query: str,
year: int = None,
updated_date: str = None,
resource_type: str = 'JOURNAL',
**kwargs
):
"""
构造queryJson字段的值
:param query: 检索式作者单位湖南中医药大学(模糊)OR作者单位湖南中医学院(模糊)
:param updated_date: 更新时间不限最近一周/一月/半年/一年今年迄今上一年度
:param year: 指定筛选的年份如果需要与updated_date参数同时使用需要在限制更新时间后再筛选
:param resource_type:
:param kwargs:
:return:
"""
_query = {
"Platform": "",
"Resource": ResourceType[resource_type].name,
"Classid": ResourceType[resource_type].value,
"Products": "",
"QNode": {
"QGroup": [
{
"Key": "Subject", "Title": "", "Logic": 0, "Items": [], "ChildItems": []
},
{
"Key": "ControlGroup", "Title": "", "Logic": 0, "Items": [], "ChildItems": []
}
]
},
"ExScope": "1",
"SearchType": 1,
"Rlang": "CHINESE",
"KuaKuCode": "",
"Expands": {},
"View": "changeDBOnlyFT",
"SearchFrom": 1
}
add_search_word(search_content=query, base_query=_query)
if updated_date and updated_date != "不限":
limit_updated_time(updated_date, base_query=_query)
if year:
limit_year_range(year=year, base_query=_query)
return _query
adv_refine_search = temp_refine_search
def temp_query_search(query_body, query: str = None, page: int = 1, page_size: int = 50,
sort: str = 'desc', sort_field: str = 'PT', updated_date: str = "不限", **kwargs):
"""
搜索请求body
:param query_body: 用来搜索的详细query, 与左侧导航body相同
:param query: aside/检索式字符串
:param page: 请求的页码
:param page_size: 每页的数量
:param sort: 排序方式, desc/asc
:param sort_field: 排序字段, PT(发表时间)/CF(被引)
:param updated_date: 默认不限
:return:
"""
page = str(page)
page_size = str(page_size)
if page == '1':
aside = query or ''
_query = {
"boolSearch": "true",
"QueryJson": json.dumps(query_body, ensure_ascii=False),
"pageNum": "1",
"pageSize": page_size,
'sortField': sort_field,
'sortType': sort,
"dstyle": "listmode",
"boolSortSearch": "false",
"aside": aside,
"searchFrom": "资源范围:学术期刊; 仅看有全文,中英文扩展; 时间范围:更新时间:%(updated_date)s; 来源类别:全部期刊; " % {"updated_date": updated_date},
"subject": "",
"language": "",
"uniplatform": "",
"CurPage": "1"
}
else:
_query = {
'boolSearch': "false",
'QueryJson': json.dumps(query_body, ensure_ascii=False),
'pageNum': page,
'pageSize': page_size,
'sortField': sort_field,
'sortType': sort,
'dstyle': 'listmode',
'boolSortSearch': "false",
'aside': '',
'searchFrom': '资源范围:学术期刊; 时间范围:更新时间:%(updated_date)s; 来源类别:全部期刊; ' % {"updated_date": updated_date},
"subject": "",
"language": "",
"uniplatform": ""
}
return _query
adv_query_search = temp_query_search
class SearchPaperArgModel:
pass
class briefParam:
@staticmethod
def getDbCode():
return 'CFLS'
@staticmethod
def getPageSize(isSearch):
return 50
@staticmethod
def getCurPage():
return 1
@staticmethod
def getSearchPaperArgModel(isSearch, cPage):
argModel = {}
dbCode = briefParam.getDbCode()
pSize = briefParam.getPageSize(isSearch)
cPage = cPage if cPage else briefParam.getCurPage()
argModel = {
'IsSearch': isSearch,
'QueryJson': ''
}
if __name__ == '__main__':
print(SearchTypeId.GROUP)
print(add_search_word(
'(作者单位:湖南中医药大学(模糊)OR作者单位湖南中医学院(模糊)OR篇名基于PINK1LETM1信号通路探讨何首乌苷减轻脑缺血再灌注损伤的作用机制(精确)'))

@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
# @Time : 2025/5/13 10:41
# @Author : zhaoxiangpeng
# @File : enum_cls.py
import enum
from datetime import timedelta
class ResourceType(enum.Enum):
"""资源类型"""
JOURNAL = "YSTT4HG0" # 学术期刊
DISSERTATION = "LSTPFY1C" # 学位论文
CONFERENCE = "JUP3MUPD" # 会议
NEWSPAPER = "MPMFIG1A" # 报纸
ALMANAC = "HHCPM1F8"
BOOK = "EMRPGLPA"
PATENT = "VUDIXAIY"
STANDARD = "WQ0UVIAA"
ACHIEVEMENTS = "BLZOG7CK"
class SearchTypeId(enum.Enum):
"""知网的检索类型"""
ADV = 1
SIMPLE = 2
AUTHOR = 3
EXPERT = 4 # 专业检索
SENTENCE = 5
'''
GROUP = 6
PAGE = 7
SORT = 8
ABSTRACT = 9
MORESENTENCE = 10
HISTORY = 11
SIZE = 12
RESULT = 13
ADVRESULT = 14
EXPERTRESULT = 15
AUTHORRESULT = 16
SENRESULT = 17
CROSSDBCHANGEDB = 18
COMBOHISTORY = 19
'''
class SearchFromId(enum.Enum):
SEARCH = 1
GROUPSEARCH = 2
RESULT = 3
PAGE = 4
SORT = 5
CHANGEDB = 6
DISPLAYMODEL = 7
NAVISEARCH = 8
HISTORY = 9
COMBOHISTORY = 10
CROSSDBCHANGEDB = 11
CHANGELANG = 12
GROUP = 99
class SearchFieldEnum(enum.Enum):
"""文献元数据字段枚举类"""
SU = "主题"
TKA = "篇关摘"
TI = "篇名"
KY = "关键词"
AB = "摘要"
CO = "小标题"
FT = "全文"
AU = "作者"
FI = "第一作者"
RP = "通讯作者"
AF = "作者单位"
LY = "期刊名称"
RF = "参考文献"
FU = "基金"
CLC = "中图分类号"
SN = "ISSN"
CN = "CN"
DOI = "DOI"
QKLM = "栏目信息"
FAF = "第一单位"
CF = "被引频次"
class OperatorEnum(enum.Enum):
模糊 = "FUZZY"
精确 = "DEFAULT"
class OperatorTypeEnum(enum.Enum):
DEFAULT = 0
TOPRANK = 1
FUZZY = 2
GT = 3
GE = 4
LT = 5
LE = 6
BETWEEN = 7
FREQUENCY = 8
PREFIX = 9
SUFFIX = 10
CONTAINS = 11
NEAR = 12
SENTENCE = 13
IS = 14
FUZZYFREQUENCY = 15
class LogicEnum(enum.Enum):
AND = 0
OR = 1
NOT = 2
class UpdatedTimeEnum(enum.Enum):
"""
最近一段时间的枚举
"""
最近一周 = timedelta(days=7)
最近一月 = timedelta(days=30)
最近半年 = timedelta(days=180)
最近一年 = timedelta(days=180)
今年迄今 = timedelta(days=180)

@ -0,0 +1,90 @@
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
from __future__ import annotations
from itemadapter import ItemAdapter
class ScienceArticlCnkiPipeline:
def process_item(self, item, spider):
return item
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Tuple, Union
from pymongo import MongoClient
from itemadapter import ItemAdapter
from pymongo.errors import (
DuplicateKeyError,
BulkWriteError
)
from science_article_cnki.db_utils.mongo import MongoDBUtils, update_document, build_update_query
if TYPE_CHECKING:
from scrapy.crawler import Crawler
from scrapy.statscollectors import StatsCollector
mongo_logger = logging.getLogger('pymongo')
mongo_logger.setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
class MongoPipeline(MongoDBUtils):
def __init__(self, mongo_uri, mongo_db, stats: StatsCollector):
super().__init__(mongo_uri, mongo_db)
self.stats: StatsCollector = stats
self.insert_failure_update_enable = True
@classmethod
def from_crawler(cls, crawler: Crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
stats=crawler.stats
)
def open_spider(self, spider):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self, item, spider):
# 确定Item类型
adapter = ItemAdapter(item)
item_type = self._get_item_type(item)
collection = self.db.get_collection(item_type)
d = adapter.asdict()
try:
insert_result = collection.insert_one(d)
self.stats.inc_value("item2db_inserted/{}".format(item_type))
except DuplicateKeyError as duplicate_error:
if self.insert_failure_update_enable:
write_error = duplicate_error.details
key_pattern = write_error.get('keyPattern')
key_value = write_error.get('keyValue')
logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value)
d.pop("_id", None)
[d.pop(k, None) for k in key_pattern.keys()]
up_result = collection.update_one(filter=key_value, update={"$set": d}, upsert=True)
self.stats.inc_value("item2db_updated/{}".format(item_type))
except Exception:
raise
return item
def close_spider(self, spider):
self.client.close()
@staticmethod
def _get_item_type(item) -> str:
"""获取Item类型"""
if hasattr(item, '__tablename__'):
return item.__class__.__tablename__
return 'items_null_table'

@ -0,0 +1,105 @@
# Scrapy settings for science_article_cnki project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
# https://docs.scrapy.org/en/latest/topics/settings.html
# https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
BOT_NAME = "science_article_cnki"
SPIDER_MODULES = ["science_article_cnki.spiders"]
NEWSPIDER_MODULE = "science_article_cnki.spiders"
ADDONS = {}
# Crawl responsibly by identifying yourself (and your website) on the user-agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'
# Obey robots.txt rules
ROBOTSTXT_OBEY = False
# Concurrency and throttling settings
#CONCURRENT_REQUESTS = 16
CONCURRENT_REQUESTS_PER_DOMAIN = 1
DOWNLOAD_DELAY = 3
# Disable cookies (enabled by default)
COOKIES_ENABLED = True
# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False
# Override the default request headers:
#DEFAULT_REQUEST_HEADERS = {
# "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
# "Accept-Language": "en",
#}
SEARCH_REQUEST_HEADERS = {
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Cookie': 'Hm_lvt_dcec09ba2227fd02c55623c1bb82776a=1739256689; UM_distinctid=197b0769b48ea3-0de0b4b2dd761f-26001051-1fa400-197b0769b49cc6; Ecp_ClientId=e250627180800765334; Ecp_ClientIp=111.186.53.36; cnkiUserKey=1b8e7dbe-3c98-864f-2b80-84b544af32af; _c_WBKFRo=UO8UFAxWLjMjlOxhuKvmtkZ4yYaXr8dPZXuhVFea; Ecp_loginuserbk=SJTU; tfstk=g5GqYEZ0ZId4NHSWG0FNzQCb6QNYs5-QjfZ_SV0gloqDDdFa7uoTCSMjSA5ZJuEOhdn6_lmxYPZ0DxMNb0nUXt99nAPZ2q5jhfuO_P0iXEE6kLgxk5FMAHTBOq3vhen9f3NMS4V_773PuGuxk5Q-60hJAqQN2mSLS5mgZz4gS540ItYPZPqliPf0SgYzWuVgSrX0ZT4_uGb0Sc0kzPEuolmgsUPu2PVgjcViG50mS_zQnU-thdfV8NPaxqqPs67Lu-cB9u5Mabzqzugc-1fiaryqZpcfbM2jI2eKGqONwSgEE74qjBx0ex0r_Jh9Csg0ZoPxa-bMXocxSfPYTNAmzSr4KbwXO1mnzVDQUbTH9SP0mANx5w-jzjojkbu1STV4GYyEgWAdmlMS8fzZ6hdrYqDnjASP1GUobXlt3GXanzUzAU8z4y3oBzrYp_6OB8VLzkTblOBTnzUzAU8PBOeu2zrBlr1..; Ecp_session=1; SID_sug=018104; knsLeftGroupSelectItem=; dsorders=CF; dsortypes=cur%20DESC; knsadv-searchtype=%7B%22BLZOG7CK%22%3A%22gradeSearch%2CmajorSearch%22%2C%22MPMFIG1A%22%3A%22gradeSearch%2CmajorSearch%2CsentenceSearch%22%2C%22T2VC03OH%22%3A%22gradeSearch%2CmajorSearch%22%2C%22JQIRZIYA%22%3A%22gradeSearch%2CmajorSearch%2CsentenceSearch%22%2C%22S81HNSV3%22%3A%22gradeSearch%22%2C%22YSTT4HG0%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22ML4DRIDX%22%3A%22gradeSearch%2CmajorSearch%22%2C%22WQ0UVIAA%22%3A%22gradeSearch%2CmajorSearch%22%2C%22VUDIXAIY%22%3A%22gradeSearch%2CmajorSearch%22%2C%22LIQN9Z3G%22%3A%22gradeSearch%22%2C%22NN3FJMUV%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22LSTPFY1C%22%3A%22gradeSearch%2CmajorSearch%2CsentenceSearch%22%2C%22HHCPM1F8%22%3A%22gradeSearch%2CmajorSearch%22%2C%22OORPU5FE%22%3A%22gradeSearch%2CmajorSearch%22%2C%22WD0FTY92%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22BPBAFJ5S%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22EMRPGLPA%22%3A%22gradeSearch%2CmajorSearch%22%2C%22PWFIRAGL%22%3A%22gradeSearch%2CmajorSearch%2CsentenceSearch%22%2C%22U8J8LYLV%22%3A%22gradeSearch%2CmajorSearch%22%2C%22R79MZMCB%22%3A%22gradeSearch%22%2C%22J708GVCE%22%3A%22gradeSearch%2CmajorSearch%22%2C%228JBZLDJQ%22%3A%22gradeSearch%2CmajorSearch%2CsentenceSearch%22%2C%22HR1YT1Z9%22%3A%22gradeSearch%2CmajorSearch%22%2C%22JUP3MUPD%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22NLBO1Z6R%22%3A%22gradeSearch%2CmajorSearch%22%2C%22RMJLXHZ3%22%3A%22gradeSearch%2CmajorSearch%2CsentenceSearch%22%2C%221UR4K4HZ%22%3A%22gradeSearch%2CmajorSearch%2CauthorSearch%2CsentenceSearch%22%2C%22NB3BWEHK%22%3A%22gradeSearch%2CmajorSearch%22%2C%22XVLO76FD%22%3A%22gradeSearch%2CmajorSearch%22%7D; Ecp_IpLoginFail=25121149.65.252.186; SID_kns_new=kns018106; SID_restapi=kns018110; KNS2COOKIE=1765437722.656.114388.232155|b25e41a932fd162af3b8c5cff4059fc3; dblang=both; createtime-advInput=2025-12-11%2015%3A22%3A21; searchTimeFlags=1',
'Origin': 'https://kns.cnki.net',
'Referer': 'https://kns.cnki.net/kns8s/AdvSearch?crossids=YSTT4HG0%2CLSTPFY1C%2CJUP3MUPD%2CMPMFIG1A%2CWQ0UVIAA%2CBLZOG7CK%2CPWFIRAGL%2CEMRPGLPA%2CNLBO1Z6R%2CNN3FJMUV',
'User-Agent': USER_AGENT,
}
# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
#SPIDER_MIDDLEWARES = {
# "science_article_cnki.middlewares.ScienceArticlCnkiSpiderMiddleware": 543,
#}
RETRY_ENABLED = True
RETRY_TIMES = 2 # 重试3次
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 400, 403, 404] # 增加了一些常见的错误码
DOWNLOADER_MIDDLEWARES = {
'scrapy.downloadermiddlewares.retry.RetryMiddleware': 550
# "org_news.middlewares.OrgNewsDownloaderMiddleware": 543,
}
# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
#DOWNLOADER_MIDDLEWARES = {
# "science_article_cnki.middlewares.ScienceArticlCnkiDownloaderMiddleware": 543,
#}
# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
#EXTENSIONS = {
# "scrapy.extensions.telnet.TelnetConsole": None,
#}
# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
#ITEM_PIPELINES = {
# "science_article_cnki.pipelines.ScienceArticlCnkiPipeline": 300,
#}
# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
#AUTOTHROTTLE_ENABLED = True
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
#AUTOTHROTTLE_DEBUG = False
# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = "httpcache"
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"
# Set settings whose default value is deprecated to a future-proof value
FEED_EXPORT_ENCODING = "utf-8"
MONGO_URI = "mongodb://science-dev:kcidea1509!%25)(@101.43.239.105:27017/?authSource=science&directConnection=true"
MONGO_DATABASE = 'science2'

@ -0,0 +1,4 @@
# This package will contain the spiders of your Scrapy project
#
# Please refer to the documentation for information on how to create and manage
# your spiders.

@ -0,0 +1,101 @@
from __future__ import annotations
import math
from copy import deepcopy
from datetime import datetime
from typing import TYPE_CHECKING, Any, Self
import scrapy
from science_article_cnki.items import CnkiCitedNumberItem
from science_article_cnki.utils.tools import str2int
from science_article_cnki.models import cnki_model as model
from science_article_cnki.configs import cnki as config
if TYPE_CHECKING:
from scrapy.crawler import Crawler
class CnkiCitedNumberSpider(scrapy.Spider):
name = "cnki_cited_number"
custom_settings = dict(
DEFAULT_REQUEST_HEADERS={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en",
},
DOWNLOADER_MIDDLEWARES={
"science_article_cnki.middlewares.CnkiSearchHeadersDownloaderMiddleware": 540,
},
ITEM_PIPELINES={
"science_article_cnki.pipelines.MongoPipeline": 300,
# "science_article_cnki.pipelines.verify_data.VerifyDataIntegrity": 400,
},
LOG_LEVEL="INFO"
)
@classmethod
def from_crawler(cls, crawler: Crawler, *args: Any, **kwargs: Any) -> Self:
# 自定义逻辑
# 比如判断如果没有参数从数据库中读取
return super().from_crawler(crawler, *args, **kwargs)
def __init__(self, query: str = None, resource_type: str = "JOURNAL", query_condition: dict = None, **kwargs: Any):
super().__init__(**kwargs)
self.query = query
self.resource_type = resource_type
self.sort_field = 'CF'
self.query_condition = query_condition # {'year': '2021'}
self.page_size = 50
async def start(self):
m = dict(query=self.query, resource_type=self.resource_type, page=1, sort_field=self.sort_field, **self.query_condition)
query_body = model.adv_refine_search(**m)
search_param = model.adv_query_search(query_body, **m)
yield scrapy.FormRequest(
url=config.CNKI_ADV_SEARCH_API, method="POST", formdata=search_param, meta=m
)
def parse(self, response, **kwargs):
meta = response.meta
# -------------------------------------------- 计算一共有多少页的逻辑 --------------------------------------------
# 提取检索结果的数量
total_prm = response.xpath('//span[@class="pagerTitleCell"]/em/text()').get()
if not total_prm:
self.logger.warning("""
当前 {query}
响应 {resp}""".format(query=meta.get('query'), resp=response.body))
return
total = str2int(total_prm.replace(',', '')) # 格式化数量字符串并转int
# 计算一共有多少页
max_page = math.ceil(total / self.page_size)
meta['max_page'] = max_page
batch_time = datetime.now()
tr_nodes = response.xpath('//div[@id="gridTable"]//table[@class="result-table-list"]/tbody/tr')
for tr_node in tr_nodes:
third_id = tr_node.xpath('./td[@class="operat"]/a[@class="icon-collect"]/@data-filename').get() # 三方id
cited_str = tr_node.xpath('./td[@class="quote"]/span/a/text()').get() # 三方id
if third_id and cited_str:
cited_item = CnkiCitedNumberItem()
cited_item['third_id'] = third_id
cited_item['cited'] = str2int(cited_str, 0)
cited_item['updated_at'] = batch_time
yield cited_item
meta_copy: dict = deepcopy(meta)
meta_copy['page'] += 1
query_body = model.adv_refine_search(**meta_copy)
search_param = model.adv_query_search(query_body, **meta_copy)
yield scrapy.FormRequest(
url=config.CNKI_ADV_SEARCH_API, method="POST", formdata=search_param,
meta=meta_copy
)
if __name__ == '__main__':
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
process = CrawlerProcess(get_project_settings())
task_params = dict()
task_params.setdefault('query', '(作者单位:西安建筑科技大学(模糊)')
task_params.setdefault('query_condition', {'year': '2026'})
process.crawl(CnkiCitedNumberSpider, **task_params)
process.start() # 阻塞直到所有爬虫完成

@ -0,0 +1,10 @@
import scrapy
class ExampleSpider(scrapy.Spider):
name = "example"
allowed_domains = ["example.com"]
start_urls = ["https://example.com"]
def parse(self, response):
pass

@ -0,0 +1,17 @@
from typing import List, Tuple
from datetime import datetime
def str2int(val, replace=0):
try:
val = int(val)
except ValueError:
val = replace
except TypeError:
val = replace
return val
def get_today_date(fmt: str = "%Y-%m-%d"):
return datetime.today().strftime(fmt)

@ -0,0 +1,11 @@
# Automatically created by: scrapy startproject
#
# For more information about the [deploy] section see:
# https://scrapyd.readthedocs.io/en/latest/deploy.html
[settings]
default = science_article_cnki.settings
[deploy]
#url = http://localhost:6800/
project = science_article_cnki
Loading…
Cancel
Save