You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

241 lines
8.2 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# @Time : 2025/10/23 16:30
# @Author : deepseek
# @File : dingtalk.py
# extensions/dingtalk_extension.py
import json
import time
import logging
import requests
from scrapy import signals
from scrapy.exceptions import NotConfigured
SPIDER_START_MSG = """🚀 爬虫启动通知\n
**爬虫名称**: %(spider_name)s\n
**开始时间**: %(started_time)s\n
**状态**: 开始运行"""
# SPIDER_CLOSED_MSG = """📊 爬虫完成通知\n
# **爬虫名称**: %(spider_name)s\n
# **完成时间**: %(finished_time)s\n
# **完成原因**: %(finish_reason)s\n
# **采集统计**:\n
# - 采集项目: %(item_scraped_count)s 条
# - 请求响应: %(response_count)s 次
# - 错误数量: %(error_count)s 个\n
# **状态**: %(state)s"""
SPIDER_ERROR_MSG = """🚨 爬虫错误\n
**爬虫名称**: %(spider_name)s\n
**URL**: %(url)s\n
**错误**: %(err_msg)s"""
SPIDER_CLOSED_MSG = """📊 爬虫完成通知\n
**爬虫名称**: %(spider_name)s\n
**机构名称**: %(org_name)s\n
**任务条件**: %(task_condition)s\n
**任务ID**: %(record_id)s\n
**完成时间**: %(finished_time)s\n
**完成原因**: %(finish_reason)s\n
**采集统计**:\n
- 采集项目: %(item_scraped_count)s
- 请求响应: %(response_count)s
- 错误数量: %(error_count)s\n
**状态**: %(state)s"""
class DingTalkExtension:
"""钉钉机器人通知扩展"""
def __init__(
self,
crawler,
webhook_url=None, secret=None,
start_msg=None, closed_msg=None,
**kwargs
):
self.crawler = crawler
self.stats = crawler.stats
self.webhook_url = webhook_url
self.secret = secret
self.spider_start_message = start_msg or SPIDER_START_MSG
self.spider_closed_message = closed_msg or SPIDER_CLOSED_MSG
self.enable_start_notify = kwargs.get("enable_start_notify", False)
self.enable_finish_notify = kwargs.get("enable_finish_notify", False)
self.enable_error_notify = kwargs.get("enable_error_notify", False)
self.logger = logging.getLogger(__name__)
@classmethod
def from_crawler(cls, crawler):
# 从配置中获取钉钉webhook URL
webhook_url = crawler.settings['DINGTALK_WEBHOOK_URL']
if not webhook_url:
raise NotConfigured('DINGTALK_WEBHOOK_URL must be set')
ding_cfg = dict(
webhook_url=crawler.settings.get('DINGTALK_WEBHOOK_URL'),
secret=crawler.settings.get('DINGTALK_SECRET'),
# 获取自定义消息模板
start_msg=crawler.settings.get('DINGTALK_START_MESSAGE', SPIDER_START_MSG),
closed_msg=crawler.settings.get('DINGTALK_CLOSED_MESSAGE', SPIDER_CLOSED_MSG),
enable_start_notify=crawler.settings.getbool('DINGTALK_ENABLE_START', False),
enable_finish_notify=crawler.settings.getbool('DINGTALK_ENABLE_FINISH', False),
enable_error_notify=crawler.settings.getbool('DINGTALK_ENABLE_ERROR', False),
)
ext = cls(crawler=crawler, **ding_cfg)
# 注册信号处理器
if ext.enable_start_notify:
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
if ext.enable_finish_notify:
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
if ext.enable_error_notify:
crawler.signals.connect(ext.spider_error, signal=signals.spider_error)
crawler.signals.connect(ext.item_error, signal=signals.item_error)
return ext
def spider_opened(self, spider):
"""爬虫开始时发送通知"""
self.stats = spider.crawler.stats
message = self._get_default_start_message(spider)
self._send_dingtalk_message(message, spider)
def spider_closed(self, spider, reason):
"""爬虫结束时发送通知"""
message = self._get_default_closed_message(spider, reason)
self._send_dingtalk_message(message, spider)
def spider_error(self, failure, response, spider):
"""爬虫错误时发送通知"""
message = SPIDER_ERROR_MSG % {
"spider_name": spider.name,
"url": response.url,
"err_msg": str(failure.value),
}
self._send_dingtalk_message(message, spider, is_error=True)
def item_error(self, failure, response, spider):
pass
def _get_default_start_message(self, spider):
"""默认开始消息模板"""
message = self.spider_start_message % {"spider_name": spider.name, "started_time": self._get_current_time()}
return message
def _get_default_closed_message(self, spider, reason):
"""默认结束消息模板"""
stats = self.stats
# 获取统计信息
item_scraped_count = stats.get_value('item_scraped_count', 0)
response_count = stats.get_value('response_received_count', 0)
error_count = stats.get_value('log_count/ERROR', 0)
finish_reason = self._get_reason_display(reason)
task_obj = spider.task_obj
message = self.spider_closed_message % {
"spider_name": spider.name,
"org_name": task_obj['org_name'],
"task_condition": task_obj['task_condition'],
"record_id": spider.record_id,
"finished_time": self._get_current_time(),
"finish_reason": finish_reason,
"item_scraped_count": spider.get_records_found(),
"response_count": response_count,
"error_count": error_count,
"state": '✅ 成功完成' if reason == 'finished' else '⚠️ 异常结束'
}
return message
def _get_reason_display(self, reason):
"""获取完成原因的可读描述"""
reason_map = {
'finished': '正常完成',
'shutdown': '手动关闭',
'cancelled': '被取消',
}
return reason_map.get(reason, reason)
def _get_current_time(self):
"""获取当前时间"""
from datetime import datetime
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def _generate_signature(self, timestamp: int) -> str:
"""
生成签名
Args:
timestamp: 时间戳
Returns:
签名字符串
"""
if not self.secret:
return ""
import hmac
import hashlib
import base64
import urllib.parse
string_to_sign = f"{timestamp}\n{self.secret}"
hmac_code = hmac.new(
self.secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def _build_webhook_url(self) -> str:
"""
构建完整的webhook URL包含签名
Returns:
完整的webhook URL
"""
if not self.secret:
return self.webhook_url
timestamp = int(time.time() * 1000)
sign = self._generate_signature(timestamp)
return f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
def _send_dingtalk_message(self, message, spider, is_error=False):
"""发送钉钉消息"""
try:
webhook_url = self._build_webhook_url()
headers = {
"Content-Type": "application/json",
"User-Agent": "DingTalk-Bot/1.0"
}
# 构建消息体
data = {
"msgtype": "markdown",
"markdown": {
"title": f"爬虫通知 - {spider.name}",
"text": message
},
"at": {
"isAtAll": is_error # 如果是错误,@所有人
}
}
response = requests.post(
webhook_url,
data=json.dumps(data),
headers=headers,
timeout=10
)
if response.status_code == 200:
self.logger.info(f"钉钉通知发送成功: {spider.name}")
else:
self.logger.error(f"钉钉通知发送失败: {response.status_code} - {response.text}")
except Exception as e:
self.logger.error(f"发送钉钉通知时出错: {e}")