From fe7044292b379413cbf6178afb6554cd644fb0cf Mon Sep 17 00:00:00 2001 From: zhaoxiangpeng <1943364377@qq.com> Date: Wed, 19 Nov 2025 17:11:19 +0800 Subject: [PATCH] =?UTF-8?q?change:=E4=BB=BB=E5=8A=A1=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E7=A1=AE=E8=AE=A4=E5=92=8C=E9=92=89=E9=92=89=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../extensions/ackextension.py | 2 +- .../extensions/dingtalk_extension.py | 175 ++++++++++++++---- 2 files changed, 144 insertions(+), 33 deletions(-) diff --git a/science_article_add/science_article_add/extensions/ackextension.py b/science_article_add/science_article_add/extensions/ackextension.py index 3d52e85..05f13a3 100644 --- a/science_article_add/science_article_add/extensions/ackextension.py +++ b/science_article_add/science_article_add/extensions/ackextension.py @@ -24,7 +24,7 @@ class SpiderProtocol: class ACKExtension: def __init__(self, crawler: Crawler): self.crawler = crawler - self.change_state_sql = 'update task_batch_record set %(update_kws)s where id=%(record_id)s' + self.change_state_sql = 'update task_batch_record set %(update_kws)s where %(update_cond)s' @classmethod def from_crawler(cls, crawler): diff --git a/science_article_add/science_article_add/extensions/dingtalk_extension.py b/science_article_add/science_article_add/extensions/dingtalk_extension.py index fa85260..671d97c 100644 --- a/science_article_add/science_article_add/extensions/dingtalk_extension.py +++ b/science_article_add/science_article_add/extensions/dingtalk_extension.py @@ -4,61 +4,124 @@ # @File : dingtalk.py # extensions/dingtalk_extension.py import json +import time import logging import requests from scrapy import signals from scrapy.exceptions import NotConfigured +SPIDER_START_MSG = """🚀 爬虫启动通知\n +**爬虫名称**: %(spider_name)s\n +**开始时间**: %(started_time)s\n +**状态**: 开始运行""" +# SPIDER_CLOSED_MSG = """📊 爬虫完成通知\n +# **爬虫名称**: %(spider_name)s\n +# **完成时间**: %(finished_time)s\n +# **完成原因**: %(finish_reason)s\n +# **采集统计**:\n +# - 采集项目: %(item_scraped_count)s 条 +# - 请求响应: %(response_count)s 次 +# - 错误数量: %(error_count)s 个\n +# **状态**: %(state)s""" +SPIDER_ERROR_MSG = """🚨 爬虫错误\n +**爬虫名称**: %(spider_name)s\n +**URL**: %(url)s\n +**错误**: %(err_msg)s""" +SPIDER_CLOSED_MSG = """📊 爬虫完成通知\n +**爬虫名称**: %(spider_name)s\n +**机构名称**: %(org_name)s\n +**任务条件**: %(task_condition)s\n +**任务ID**: %(record_id)s\n +**完成时间**: %(finished_time)s\n +**完成原因**: %(finish_reason)s\n +**采集统计**:\n + - 采集项目: %(item_scraped_count)s 条 + - 请求响应: %(response_count)s 次 + - 错误数量: %(error_count)s 个\n +**状态**: %(state)s""" + class DingTalkExtension: """钉钉机器人通知扩展""" - def __init__(self, webhook_url, spider_start_message=None, spider_closed_message=None): + def __init__( + self, + crawler, + webhook_url=None, secret=None, + start_msg=None, closed_msg=None, + **kwargs + ): + self.crawler = crawler + self.stats = crawler.stats + self.webhook_url = webhook_url - self.spider_start_message = spider_start_message - self.spider_closed_message = spider_closed_message - self.stats = None + self.secret = secret + + self.spider_start_message = start_msg or SPIDER_START_MSG + self.spider_closed_message = closed_msg or SPIDER_CLOSED_MSG + self.enable_start_notify = kwargs.get("enable_start_notify", False) + self.enable_finish_notify = kwargs.get("enable_finish_notify", False) + self.enable_error_notify = kwargs.get("enable_error_notify", False) + self.logger = logging.getLogger(__name__) @classmethod def from_crawler(cls, crawler): # 从配置中获取钉钉webhook URL - webhook_url = crawler.settings.get('DINGTALK_WEBHOOK_URL') + webhook_url = crawler.settings['DINGTALK_WEBHOOK_URL'] if not webhook_url: raise NotConfigured('DINGTALK_WEBHOOK_URL must be set') - - # 获取自定义消息模板 - start_msg = crawler.settings.get('DINGTALK_START_MESSAGE') - closed_msg = crawler.settings.get('DINGTALK_CLOSED_MESSAGE') - - ext = cls(webhook_url, start_msg, closed_msg) + ding_cfg = dict( + webhook_url=crawler.settings.get('DINGTALK_WEBHOOK_URL'), + secret=crawler.settings.get('DINGTALK_SECRET'), + # 获取自定义消息模板 + start_msg=crawler.settings.get('DINGTALK_START_MESSAGE', SPIDER_START_MSG), + closed_msg=crawler.settings.get('DINGTALK_CLOSED_MESSAGE', SPIDER_CLOSED_MSG), + enable_start_notify=crawler.settings.getbool('DINGTALK_ENABLE_START', False), + enable_finish_notify=crawler.settings.getbool('DINGTALK_ENABLE_FINISH', False), + enable_error_notify=crawler.settings.getbool('DINGTALK_ENABLE_ERROR', False), + ) + + ext = cls(crawler=crawler, **ding_cfg) # 注册信号处理器 - crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) - crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) - crawler.signals.connect(ext.spider_error, signal=signals.spider_error) + if ext.enable_start_notify: + crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) + if ext.enable_finish_notify: + crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) + if ext.enable_error_notify: + crawler.signals.connect(ext.spider_error, signal=signals.spider_error) + crawler.signals.connect(ext.item_error, signal=signals.item_error) return ext def spider_opened(self, spider): """爬虫开始时发送通知""" self.stats = spider.crawler.stats - message = self.spider_start_message or self._get_default_start_message(spider) + message = self._get_default_start_message(spider) self._send_dingtalk_message(message, spider) def spider_closed(self, spider, reason): """爬虫结束时发送通知""" - message = self.spider_closed_message or self._get_default_closed_message(spider, reason) + message = self._get_default_closed_message(spider, reason) self._send_dingtalk_message(message, spider) def spider_error(self, failure, response, spider): """爬虫错误时发送通知""" - error_message = f"🚨 爬虫错误\n爬虫: {spider.name}\nURL: {response.url}\n错误: {str(failure.value)}" - self._send_dingtalk_message(error_message, spider, is_error=True) + message = SPIDER_ERROR_MSG % { + "spider_name": spider.name, + "url": response.url, + "err_msg": str(failure.value), + } + self._send_dingtalk_message(message, spider, is_error=True) + + def item_error(self, failure, response, spider): + pass def _get_default_start_message(self, spider): """默认开始消息模板""" - return f"🚀 爬虫启动通知\n**爬虫名称**: {spider.name}\n**开始时间**: {self._get_current_time()}\n**状态**: 开始运行" + message = self.spider_start_message % {"spider_name": spider.name, "started_time": self._get_current_time()} + return message def _get_default_closed_message(self, spider, reason): """默认结束消息模板""" @@ -69,17 +132,19 @@ class DingTalkExtension: response_count = stats.get_value('response_received_count', 0) error_count = stats.get_value('log_count/ERROR', 0) finish_reason = self._get_reason_display(reason) - - message = f"""📊 爬虫完成通知 -**爬虫名称**: {spider.name} -**完成时间**: {self._get_current_time()} -**完成原因**: {finish_reason} -**采集统计**: - - 采集项目: {item_scraped_count} 条 - - 请求响应: {response_count} 次 - - 错误数量: {error_count} 个 -**状态**: {'✅ 成功完成' if reason == 'finished' else '⚠️ 异常结束'}""" - + task_obj = spider.task_obj + message = self.spider_closed_message % { + "spider_name": spider.name, + "org_name": task_obj['org_name'], + "task_condition": task_obj['task_condition'], + "record_id": spider.record_id, + "finished_time": self._get_current_time(), + "finish_reason": finish_reason, + "item_scraped_count": spider.get_records_found(), + "response_count": response_count, + "error_count": error_count, + "state": '✅ 成功完成' if reason == 'finished' else '⚠️ 异常结束' + } return message def _get_reason_display(self, reason): @@ -96,10 +161,56 @@ class DingTalkExtension: from datetime import datetime return datetime.now().strftime('%Y-%m-%d %H:%M:%S') + def _generate_signature(self, timestamp: int) -> str: + """ + 生成签名 + + Args: + timestamp: 时间戳 + + Returns: + 签名字符串 + """ + if not self.secret: + return "" + + import hmac + import hashlib + import base64 + import urllib.parse + + string_to_sign = f"{timestamp}\n{self.secret}" + hmac_code = hmac.new( + self.secret.encode('utf-8'), + string_to_sign.encode('utf-8'), + digestmod=hashlib.sha256 + ).digest() + + sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) + return sign + + def _build_webhook_url(self) -> str: + """ + 构建完整的webhook URL(包含签名) + + Returns: + 完整的webhook URL + """ + if not self.secret: + return self.webhook_url + + timestamp = int(time.time() * 1000) + sign = self._generate_signature(timestamp) + return f"{self.webhook_url}×tamp={timestamp}&sign={sign}" + def _send_dingtalk_message(self, message, spider, is_error=False): """发送钉钉消息""" try: - headers = {'Content-Type': 'application/json'} + webhook_url = self._build_webhook_url() + headers = { + "Content-Type": "application/json", + "User-Agent": "DingTalk-Bot/1.0" + } # 构建消息体 data = { @@ -114,7 +225,7 @@ class DingTalkExtension: } response = requests.post( - self.webhook_url, + webhook_url, data=json.dumps(data), headers=headers, timeout=10