add:scrapy添加钉钉通知

main
zhaoxiangpeng 2 months ago
parent f977b8ad51
commit ea68319ee6

1
.gitignore vendored

@ -160,3 +160,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
/science_article_add/.idea/

@ -0,0 +1,129 @@
# -*- coding: utf-8 -*-
# @Time : 2025/10/23 16:30
# @Author : deepseek
# @File : dingtalk.py
# extensions/dingtalk_extension.py
import json
import logging
import requests
from scrapy import signals
from scrapy.exceptions import NotConfigured
class DingTalkExtension:
"""钉钉机器人通知扩展"""
def __init__(self, webhook_url, spider_start_message=None, spider_closed_message=None):
self.webhook_url = webhook_url
self.spider_start_message = spider_start_message
self.spider_closed_message = spider_closed_message
self.stats = None
self.logger = logging.getLogger(__name__)
@classmethod
def from_crawler(cls, crawler):
# 从配置中获取钉钉webhook URL
webhook_url = crawler.settings.get('DINGTALK_WEBHOOK_URL')
if not webhook_url:
raise NotConfigured('DINGTALK_WEBHOOK_URL must be set')
# 获取自定义消息模板
start_msg = crawler.settings.get('DINGTALK_START_MESSAGE')
closed_msg = crawler.settings.get('DINGTALK_CLOSED_MESSAGE')
ext = cls(webhook_url, start_msg, closed_msg)
# 注册信号处理器
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(ext.spider_error, signal=signals.spider_error)
return ext
def spider_opened(self, spider):
"""爬虫开始时发送通知"""
self.stats = spider.crawler.stats
message = self.spider_start_message or self._get_default_start_message(spider)
self._send_dingtalk_message(message, spider)
def spider_closed(self, spider, reason):
"""爬虫结束时发送通知"""
message = self.spider_closed_message or self._get_default_closed_message(spider, reason)
self._send_dingtalk_message(message, spider)
def spider_error(self, failure, response, spider):
"""爬虫错误时发送通知"""
error_message = f"🚨 爬虫错误\n爬虫: {spider.name}\nURL: {response.url}\n错误: {str(failure.value)}"
self._send_dingtalk_message(error_message, spider, is_error=True)
def _get_default_start_message(self, spider):
"""默认开始消息模板"""
return f"🚀 爬虫启动通知\n**爬虫名称**: {spider.name}\n**开始时间**: {self._get_current_time()}\n**状态**: 开始运行"
def _get_default_closed_message(self, spider, reason):
"""默认结束消息模板"""
stats = self.stats
# 获取统计信息
item_scraped_count = stats.get_value('item_scraped_count', 0)
response_count = stats.get_value('response_received_count', 0)
error_count = stats.get_value('log_count/ERROR', 0)
finish_reason = self._get_reason_display(reason)
message = f"""📊 爬虫完成通知
**爬虫名称**: {spider.name}
**完成时间**: {self._get_current_time()}
**完成原因**: {finish_reason}
**采集统计**:
- 采集项目: {item_scraped_count}
- 请求响应: {response_count}
- 错误数量: {error_count}
**状态**: {'✅ 成功完成' if reason == 'finished' else '⚠️ 异常结束'}"""
return message
def _get_reason_display(self, reason):
"""获取完成原因的可读描述"""
reason_map = {
'finished': '正常完成',
'shutdown': '手动关闭',
'cancelled': '被取消',
}
return reason_map.get(reason, reason)
def _get_current_time(self):
"""获取当前时间"""
from datetime import datetime
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def _send_dingtalk_message(self, message, spider, is_error=False):
"""发送钉钉消息"""
try:
headers = {'Content-Type': 'application/json'}
# 构建消息体
data = {
"msgtype": "markdown",
"markdown": {
"title": f"爬虫通知 - {spider.name}",
"text": message
},
"at": {
"isAtAll": is_error # 如果是错误,@所有人
}
}
response = requests.post(
self.webhook_url,
data=json.dumps(data),
headers=headers,
timeout=10
)
if response.status_code == 200:
self.logger.info(f"钉钉通知发送成功: {spider.name}")
else:
self.logger.error(f"钉钉通知发送失败: {response.status_code} - {response.text}")
except Exception as e:
self.logger.error(f"发送钉钉通知时出错: {e}")

@ -0,0 +1,21 @@
import scrapy
class WosItem(scrapy.Item):
# define the fields for your item here like:
third_id = scrapy.Field()
updated_at = scrapy.Field()
class WosArticleItem(WosItem):
"""
wos发文item
"""
exported = scrapy.Field()
class WosCitedNumberItem(WosItem):
"""发文被引量item"""
third_id = scrapy.Field()
cited = scrapy.Field()
updated_at = scrapy.Field()

@ -0,0 +1,38 @@
# pipelines.py
import pymongo
from itemadapter import ItemAdapter
from science_article_add.items.wos import WosCitedNumberItem
class MongoDBPipeline:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy_data')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 根据Item类型存储到不同的集合
if isinstance(item, WosCitedNumberItem):
collection_name = 'relation_cited_number_wos'
else:
collection_name = 'relation_cited_number_other'
# 插入数据
self.db[collection_name].insert_one(dict(adapter))
return item

@ -92,3 +92,25 @@ REDIS_URL = 'redis://:kcidea1509@192.168.1.211:6379/10'
# Set settings whose default value is deprecated to a future-proof value
FEED_EXPORT_ENCODING = "utf-8"
# 钉钉机器人配置
DINGTALK_WEBHOOK_URL = 'https://oapi.dingtalk.com/robot/send?access_token=1252fe1ef63e95ced11ac87a01e9978670e82036a516c558e524f89e11513f9f'
DINGTALK_SECRET = 'SECe77fe7cd6c0dbfcdd9ebe6ba1941ddc376be86ca717e9d68bb177b7eded71091'
# 自定义消息模板(可选)
DINGTALK_START_MESSAGE = "🚀 爬虫启动啦!\n**爬虫**: {spider_name}\n**时间**: {time}"
DINGTALK_CLOSED_MESSAGE = "✅ 爬虫完成!\n**爬虫**: {spider_name}\n**项目数**: {item_count}"
# 启用扩展
EXTENSIONS = {
'scrapy_example.extensions.dingtalk_extension.DingTalkExtension': 500,
# 'scrapy_example.extensions.advanced_dingtalk_extension.AdvancedDingTalkExtension': 100,
}
# 启用/禁用特定通知
DINGTALK_ENABLE_START = True
DINGTALK_ENABLE_FINISH = True
DINGTALK_ENABLE_ERROR = True

@ -1,9 +1,12 @@
import math
from datetime import datetime
from urllib.parse import urlencode
from copy import deepcopy
import scrapy
from scrapy.http.response.json import JsonResponse
from science_article_add.items import WosLiteAddItem
from science_article_add.items.wos import WosCitedNumberItem
from science_article_add.models import wos_model as model
from science_article_add.configs import wos as config
from science_article_add.utils import tools
@ -36,103 +39,53 @@ class WosLatestIncrementSpider(scrapy.Spider):
self.query_content = task_obj['content']
self.query_condition = task_obj['task_condition']
self.first_page = task_obj.get('first_page', 1)
async def start(self):
full_query = self.query_content
if self.query_condition is not None:
full_query = '%(query)s %(condition)s' % {'query': self.query_content, 'condition': self.query_condition}
yield scrapy.Request(url=config.WOS_LITE_QUERY_FIRST_API + '?' + urlencode(model.lite_base_model(usr_query=full_query)),
dont_filter=True,
meta={'query': full_query, 'PAGE': 1})
meta = dict(q=full_query, page=self.first_page, limit=50, detail="short")
yield scrapy.Request(url=config.WOS_STARTER_DOCUMENT_API + '?' + urlencode(model.starter_documents_get(**meta)),
meta=meta)
async def parse(self, response, **kwargs):
async def parse(self, response: JsonResponse, **kwargs):
meta = response.meta
request = response.request
request: scrapy.Request = response.request
task_query_id = self.query_id
task_org_id = self.org_id
task_record_id = self.record_id
self.logger.debug('%s: %s' % ('parse_query_api', meta))
if response.status != 200:
self.logger.warning("""
响应异常
状态码: %s
响应内容: %s""" % (response.status, response.text))
req_meta = request.meta
resp_result = response.json()
query_result = resp_result.get('QueryResult')
datas = resp_result.get('Data')
query_id = query_result.get('QueryID')
records_found = query_result.get('RecordsFound')
max_page = math.ceil(records_found / 100)
meta_copy: dict = deepcopy(meta)
meta_copy.update({'MAX_PAGE': max_page})
meta_copy.update({'TOTAL': records_found})
meta_copy.update({'QUERY_ID': query_id})
meta_copy.update({'next_page': meta['PAGE'] + 1})
meta_copy.update({'PAGE': meta['PAGE'] + 1})
meta_copy.update({'first_record': calculate_next_page(meta_copy['next_page'])})
for data in datas:
add_item = WosLiteAddItem()
# 入库年份优先按照自己指定的
to_db_year = meta.get("search_year")
if not to_db_year:
publish_year = data.get("Source", {}).get("Published.BiblioYear", [])
if publish_year:
to_db_year = tools.str2int(publish_year[0])
add_item["third_id"] = data.get('UT')
add_item["year"] = to_db_year
add_item["query_ids"] = [task_query_id]
add_item["school_ids"] = [task_org_id]
add_item["task_ids"] = [task_record_id]
yield add_item
yield scrapy.Request(
url=config.WOS_LITE_QUERY_API + f'/{query_id}',
body=model.lite_query_model(**meta_copy),
meta=meta_copy,
callback=self.parse_query_api,
)
async def parse_query_api(self, response, **kwargs):
meta = response.meta
task_query_id = self.query_id
task_org_id = self.org_id
task_record_id = self.record_id
self.logger.debug("""
%s
%s""" % ('parse_query_api', meta))
resp_result = response.json()
query_id = meta.get('QUERY_ID')
datas = resp_result.get('Data', [])
if len(datas):
for data in datas:
add_item = WosLiteAddItem()
# 入库年份优先按照自己指定的
to_db_year = meta.get("search_year")
if not to_db_year:
publish_year = data.get("Source", {}).get("Published.BiblioYear", [])
if publish_year:
to_db_year = tools.str2int(publish_year[0])
add_item["third_id"] = data.get('UT')
add_item["year"] = to_db_year
add_item["query_ids"] = [task_query_id]
add_item["school_ids"] = [task_org_id]
add_item["task_ids"] = [task_record_id]
yield add_item
else:
# 根据条件记录生成sql记录结果
update_state_condition = ('batch_date = "%(batch_date)s"\n'
'query_id = %(query_id)s\n'
'task_condition = "%(task_condition)s"') % self.task_obj
self.logger.warning('没有数据了\n%s' % update_state_condition)
return
if meta['first_record'] < meta['TOTAL']:
meta_copy = deepcopy(meta)
meta_copy.update({'next_page': meta['PAGE'] + 1})
meta_copy.update({'PAGE': meta['PAGE'] + 1})
meta_copy.update({'first_record': calculate_next_page(meta_copy['next_page'])})
yield scrapy.Request(
url=config.WOS_LITE_QUERY_API + f'/{query_id}',
body=model.lite_query_model(**meta_copy),
meta=meta_copy,
callback=self.parse_query_api,
)
metadata: dict = resp_result.get("metadata")
current_page = metadata.get("page")
records_found = metadata.get('total')
max_page = req_meta.get("MAX_PAGE")
if req_meta.get("page") == self.first_page:
self.logger.info("""
检索式: %s
检索到结果: %s""" % (req_meta.get("q"), records_found))
max_page = req_meta["MAX_PAGE"] = math.ceil(records_found / config.WOS_STARTER_PER_PAGE_LIMIT)
batch_time = datetime.now()
hits: list = resp_result.get("hits")
for record in hits:
cited_num = tools.get_list_key(array=record.get("citations"), target="count", condition=("db", "WOS"))
if cited_num:
cited_item = WosCitedNumberItem(third_id=record.get("uid"), cited=cited_num, updated_at=batch_time)
yield cited_item
yield WosIdRelationItem(third_id=record.get("uid"), query_ids=[task_query_id], updated_at=batch_time)
if current_page < max_page:
meta_copy: dict = deepcopy(req_meta)
meta_copy.update({'page': meta_copy['page'] + 1})
yield scrapy.Request(config.WOS_STARTER_DOCUMENT_API + '?' + urlencode(model.starter_documents_get(**meta_copy)),
meta=meta_copy,
task_query_id=request.task_query_id)

@ -1,3 +1,6 @@
from typing import List, Tuple
def str2int(val, replace=0):
try:
val = int(val)
@ -5,4 +8,20 @@ def str2int(val, replace=0):
val = replace
except TypeError:
val = replace
return val
return val
def get_list_key(array: List[dict], target: str, condition: Tuple[str, str]):
"""
给定一个list [{key: val1, target: val2}, {key: val1, target: val2}]
根据condition(key=val)返回第一个target对应的值
:param target:
:param condition:
:param array:
:return:
"""
n, v = condition
for dic in array:
if dic.get(n) == v:
return dic.get(target)

Loading…
Cancel
Save