You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

379 lines
11 KiB
Python

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import asyncio
import aiohttp
from typing import Dict, List, Any, Optional
from enum import Enum
import logging
from dataclasses import dataclass
import time
logger = logging.getLogger(__name__)
class DingTalkMessageType(Enum):
"""钉钉消息类型枚举"""
TEXT = "text"
LINK = "link"
MARKDOWN = "markdown"
ACTION_CARD = "actionCard"
FEED_CARD = "feedCard"
@dataclass
class DingTalkConfig:
"""钉钉配置数据类"""
webhook: str
secret: Optional[str] = None
at_mobiles: Optional[List[str]] = None
at_user_ids: Optional[List[str]] = None
at_all: bool = False
class DingTalkSender:
"""
钉钉消息推送器
功能描述:
1. 支持多种消息类型文本、链接、Markdown、ActionCard、FeedCard
2. 支持@指定用户或@所有人
3. 支持签名安全设置
4. 支持异步发送和批量发送
5. 内置重试机制和错误处理
"""
def __init__(self, config: DingTalkConfig):
"""
初始化钉钉消息发送器
Args:
config: 钉钉机器人配置
"""
self.config = config
self.session: Optional[aiohttp.ClientSession] = None
self._retry_count = 3
self._retry_delay = 1
async def __aenter__(self):
"""异步上下文管理器入口"""
await self._ensure_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
await self.close()
async def _ensure_session(self):
"""确保会话存在"""
if self.session is None:
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=10)
)
async def close(self):
"""关闭会话"""
if self.session:
await self.session.close()
self.session = None
def _generate_signature(self, timestamp: int) -> str:
"""
生成签名
Args:
timestamp: 时间戳
Returns:
签名字符串
"""
if not self.config.secret:
return ""
import hmac
import hashlib
import base64
import urllib.parse
string_to_sign = f"{timestamp}\n{self.config.secret}"
hmac_code = hmac.new(
self.config.secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def _build_webhook_url(self) -> str:
"""
构建完整的webhook URL包含签名
Returns:
完整的webhook URL
"""
if not self.config.secret:
return self.config.webhook
timestamp = int(time.time() * 1000)
sign = self._generate_signature(timestamp)
return f"{self.config.webhook}&timestamp={timestamp}&sign={sign}"
def _build_at_info(self) -> Dict[str, Any]:
"""
构建@信息
Returns:
@信息字典
"""
at_info = {}
if self.config.at_mobiles:
at_info["atMobiles"] = self.config.at_mobiles
if self.config.at_user_ids:
at_info["atUserIds"] = self.config.at_user_ids
if self.config.at_all:
at_info["isAtAll"] = True
return at_info
async def _send_request(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
发送请求到钉钉
Args:
data: 请求数据
Returns:
响应数据
Raises:
Exception: 发送失败时抛出异常
"""
await self._ensure_session()
webhook_url = self._build_webhook_url()
headers = {
"Content-Type": "application/json",
"User-Agent": "DingTalk-Bot/1.0"
}
last_exception = None
for attempt in range(self._retry_count):
try:
logger.info(f"发送钉钉消息,尝试 {attempt + 1}/{self._retry_count}")
async with self.session.post(
webhook_url,
json=data,
headers=headers
) as response:
result = await response.json()
if response.status == 200 and result.get("errcode") == 0:
logger.info("钉钉消息发送成功")
return result
else:
error_msg = f"钉钉消息发送失败: {result.get('errmsg', 'Unknown error')}"
logger.error(error_msg)
last_exception = Exception(error_msg)
except asyncio.TimeoutError:
error_msg = f"钉钉消息发送超时,尝试 {attempt + 1}/{self._retry_count}"
logger.warning(error_msg)
last_exception = Exception(error_msg)
except Exception as e:
error_msg = f"钉钉消息发送异常: {str(e)},尝试 {attempt + 1}/{self._retry_count}"
logger.error(error_msg)
last_exception = e
# 如果不是最后一次尝试,等待重试
if attempt < self._retry_count - 1:
await asyncio.sleep(self._retry_delay * (attempt + 1))
# 所有重试都失败,抛出异常
raise last_exception or Exception("钉钉消息发送失败")
async def send_text(self, content: str, at_mobiles: Optional[List[str]] = None,
at_user_ids: Optional[List[str]] = None, at_all: Optional[bool] = None) -> Dict[str, Any]:
"""
发送文本消息
Args:
content: 消息内容
at_mobiles: @的手机号列表
at_user_ids: @的用户ID列表
at_all: 是否@所有人
Returns:
发送结果
"""
at_info = self._build_at_info()
# 覆盖默认的@设置
if at_mobiles is not None:
at_info["atMobiles"] = at_mobiles
if at_user_ids is not None:
at_info["atUserIds"] = at_user_ids
if at_all is not None:
at_info["isAtAll"] = at_all
data = {
"msgtype": DingTalkMessageType.TEXT.value,
"text": {
"content": content
},
"at": at_info
}
return await self._send_request(data)
async def send_markdown(self, title: str, text: str, at_mobiles: Optional[List[str]] = None,
at_user_ids: Optional[List[str]] = None, at_all: Optional[bool] = None) -> Dict[str, Any]:
"""
发送Markdown消息
Args:
title: 消息标题
text: Markdown格式的消息内容
at_mobiles: @的手机号列表
at_user_ids: @的用户ID列表
at_all: 是否@所有人
Returns:
发送结果
"""
at_info = self._build_at_info()
if at_mobiles is not None:
at_info["atMobiles"] = at_mobiles
if at_user_ids is not None:
at_info["atUserIds"] = at_user_ids
if at_all is not None:
at_info["isAtAll"] = at_all
data = {
"msgtype": DingTalkMessageType.MARKDOWN.value,
"markdown": {
"title": title,
"text": text
},
"at": at_info
}
return await self._send_request(data)
async def send_link(self, title: str, text: str, message_url: str,
pic_url: Optional[str] = None) -> Dict[str, Any]:
"""
发送链接消息
Args:
title: 消息标题
text: 消息内容
message_url: 点击消息跳转的URL
pic_url: 图片URL
Returns:
发送结果
"""
data = {
"msgtype": DingTalkMessageType.LINK.value,
"link": {
"title": title,
"text": text,
"messageUrl": message_url,
}
}
if pic_url:
data["link"]["picUrl"] = pic_url
return await self._send_request(data)
async def send_action_card(self, title: str, text: str, single_title: str,
single_url: str, btn_orientation: str = "0") -> Dict[str, Any]:
"""
发送整体跳转ActionCard消息
Args:
title: 消息标题
text: 消息内容
single_title: 单个按钮标题
single_url: 单个按钮跳转URL
btn_orientation: 按钮排列方向0-竖直1-横向
Returns:
发送结果
"""
data = {
"msgtype": DingTalkMessageType.ACTION_CARD.value,
"actionCard": {
"title": title,
"text": text,
"singleTitle": single_title,
"singleURL": single_url,
"btnOrientation": btn_orientation
}
}
return await self._send_request(data)
async def send_feed_card(self, links: List[Dict[str, str]]) -> Dict[str, Any]:
"""
发送FeedCard消息
Args:
links: 链接列表每个链接包含title, messageURL, picURL
Returns:
发送结果
"""
data = {
"msgtype": DingTalkMessageType.FEED_CARD.value,
"feedCard": {
"links": links
}
}
return await self._send_request(data)
async def send_alert(self, title: str, message: str, level: str = "info",
at_users: bool = False) -> Dict[str, Any]:
"""
发送告警消息(便捷方法)
Args:
title: 告警标题
message: 告警内容
level: 告警级别 (info, warning, error, critical)
at_users: 是否@相关人员
Returns:
发送结果
"""
level_emojis = {
"info": "",
"warning": "⚠️",
"error": "",
"critical": "🚨"
}
emoji = level_emojis.get(level, "")
markdown_text = f"""
## {emoji} {title}
**级别**: {level.upper()}
**时间**: {time.strftime('%Y-%m-%d %H:%M:%S')}
**详情**:
{message}
""".strip()
at_all = at_users and self.config.at_all
at_mobiles = self.config.at_mobiles if at_users else None
at_user_ids = self.config.at_user_ids if at_users else None
return await self.send_markdown(
title=f"{emoji} {title}",
text=markdown_text,
at_mobiles=at_mobiles,
at_user_ids=at_user_ids,
at_all=at_all
)