diff --git a/.gitignore b/.gitignore index 5d381cc..144c610 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,4 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +/science_article_add/.idea/ diff --git a/science_article_add/science_article_add/extensions/__init__.py b/science_article_add/science_article_add/extensions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/science_article_add/science_article_add/extensions/dingtalk_extension.py b/science_article_add/science_article_add/extensions/dingtalk_extension.py new file mode 100644 index 0000000..fa85260 --- /dev/null +++ b/science_article_add/science_article_add/extensions/dingtalk_extension.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# @Time : 2025/10/23 16:30 +# @Author : deepseek +# @File : dingtalk.py +# extensions/dingtalk_extension.py +import json +import logging +import requests +from scrapy import signals +from scrapy.exceptions import NotConfigured + + +class DingTalkExtension: + """钉钉机器人通知扩展""" + + def __init__(self, webhook_url, spider_start_message=None, spider_closed_message=None): + self.webhook_url = webhook_url + self.spider_start_message = spider_start_message + self.spider_closed_message = spider_closed_message + self.stats = None + self.logger = logging.getLogger(__name__) + + @classmethod + def from_crawler(cls, crawler): + # 从配置中获取钉钉webhook URL + webhook_url = crawler.settings.get('DINGTALK_WEBHOOK_URL') + if not webhook_url: + raise NotConfigured('DINGTALK_WEBHOOK_URL must be set') + + # 获取自定义消息模板 + start_msg = crawler.settings.get('DINGTALK_START_MESSAGE') + closed_msg = crawler.settings.get('DINGTALK_CLOSED_MESSAGE') + + ext = cls(webhook_url, start_msg, closed_msg) + + # 注册信号处理器 + crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) + crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) + crawler.signals.connect(ext.spider_error, signal=signals.spider_error) + + return ext + + def spider_opened(self, spider): + """爬虫开始时发送通知""" + self.stats = spider.crawler.stats + message = self.spider_start_message or self._get_default_start_message(spider) + self._send_dingtalk_message(message, spider) + + def spider_closed(self, spider, reason): + """爬虫结束时发送通知""" + message = self.spider_closed_message or self._get_default_closed_message(spider, reason) + self._send_dingtalk_message(message, spider) + + def spider_error(self, failure, response, spider): + """爬虫错误时发送通知""" + error_message = f"🚨 爬虫错误\n爬虫: {spider.name}\nURL: {response.url}\n错误: {str(failure.value)}" + self._send_dingtalk_message(error_message, spider, is_error=True) + + def _get_default_start_message(self, spider): + """默认开始消息模板""" + return f"🚀 爬虫启动通知\n**爬虫名称**: {spider.name}\n**开始时间**: {self._get_current_time()}\n**状态**: 开始运行" + + def _get_default_closed_message(self, spider, reason): + """默认结束消息模板""" + stats = self.stats + + # 获取统计信息 + item_scraped_count = stats.get_value('item_scraped_count', 0) + response_count = stats.get_value('response_received_count', 0) + error_count = stats.get_value('log_count/ERROR', 0) + finish_reason = self._get_reason_display(reason) + + message = f"""📊 爬虫完成通知 +**爬虫名称**: {spider.name} +**完成时间**: {self._get_current_time()} +**完成原因**: {finish_reason} +**采集统计**: + - 采集项目: {item_scraped_count} 条 + - 请求响应: {response_count} 次 + - 错误数量: {error_count} 个 +**状态**: {'✅ 成功完成' if reason == 'finished' else '⚠️ 异常结束'}""" + + return message + + def _get_reason_display(self, reason): + """获取完成原因的可读描述""" + reason_map = { + 'finished': '正常完成', + 'shutdown': '手动关闭', + 'cancelled': '被取消', + } + return reason_map.get(reason, reason) + + def _get_current_time(self): + """获取当前时间""" + from datetime import datetime + return datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + def _send_dingtalk_message(self, message, spider, is_error=False): + """发送钉钉消息""" + try: + headers = {'Content-Type': 'application/json'} + + # 构建消息体 + data = { + "msgtype": "markdown", + "markdown": { + "title": f"爬虫通知 - {spider.name}", + "text": message + }, + "at": { + "isAtAll": is_error # 如果是错误,@所有人 + } + } + + response = requests.post( + self.webhook_url, + data=json.dumps(data), + headers=headers, + timeout=10 + ) + + if response.status_code == 200: + self.logger.info(f"钉钉通知发送成功: {spider.name}") + else: + self.logger.error(f"钉钉通知发送失败: {response.status_code} - {response.text}") + + except Exception as e: + self.logger.error(f"发送钉钉通知时出错: {e}") diff --git a/science_article_add/science_article_add/items.py b/science_article_add/science_article_add/items/__init__.py similarity index 100% rename from science_article_add/science_article_add/items.py rename to science_article_add/science_article_add/items/__init__.py diff --git a/science_article_add/science_article_add/items/wos.py b/science_article_add/science_article_add/items/wos.py new file mode 100644 index 0000000..f676cb1 --- /dev/null +++ b/science_article_add/science_article_add/items/wos.py @@ -0,0 +1,21 @@ +import scrapy + + +class WosItem(scrapy.Item): + # define the fields for your item here like: + third_id = scrapy.Field() + updated_at = scrapy.Field() + + +class WosArticleItem(WosItem): + """ + wos发文item + """ + exported = scrapy.Field() + + +class WosCitedNumberItem(WosItem): + """发文被引量item""" + third_id = scrapy.Field() + cited = scrapy.Field() + updated_at = scrapy.Field() \ No newline at end of file diff --git a/science_article_add/science_article_add/pipelines.py b/science_article_add/science_article_add/pipelines/__init__.py similarity index 100% rename from science_article_add/science_article_add/pipelines.py rename to science_article_add/science_article_add/pipelines/__init__.py diff --git a/science_article_add/science_article_add/pipelines/wos.py b/science_article_add/science_article_add/pipelines/wos.py new file mode 100644 index 0000000..557b125 --- /dev/null +++ b/science_article_add/science_article_add/pipelines/wos.py @@ -0,0 +1,38 @@ +# pipelines.py +import pymongo +from itemadapter import ItemAdapter +from science_article_add.items.wos import WosCitedNumberItem + + +class MongoDBPipeline: + def __init__(self, mongo_uri, mongo_db): + self.mongo_uri = mongo_uri + self.mongo_db = mongo_db + + @classmethod + def from_crawler(cls, crawler): + return cls( + mongo_uri=crawler.settings.get('MONGO_URI'), + mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy_data') + ) + + def open_spider(self, spider): + self.client = pymongo.MongoClient(self.mongo_uri) + self.db = self.client[self.mongo_db] + + def close_spider(self, spider): + self.client.close() + + def process_item(self, item, spider): + adapter = ItemAdapter(item) + + # 根据Item类型存储到不同的集合 + if isinstance(item, WosCitedNumberItem): + collection_name = 'relation_cited_number_wos' + else: + collection_name = 'relation_cited_number_other' + + # 插入数据 + self.db[collection_name].insert_one(dict(adapter)) + + return item diff --git a/science_article_add/science_article_add/settings.py b/science_article_add/science_article_add/settings.py index d2f6a35..9316b79 100644 --- a/science_article_add/science_article_add/settings.py +++ b/science_article_add/science_article_add/settings.py @@ -92,3 +92,25 @@ REDIS_URL = 'redis://:kcidea1509@192.168.1.211:6379/10' # Set settings whose default value is deprecated to a future-proof value FEED_EXPORT_ENCODING = "utf-8" + + +# 钉钉机器人配置 +DINGTALK_WEBHOOK_URL = 'https://oapi.dingtalk.com/robot/send?access_token=1252fe1ef63e95ced11ac87a01e9978670e82036a516c558e524f89e11513f9f' +DINGTALK_SECRET = 'SECe77fe7cd6c0dbfcdd9ebe6ba1941ddc376be86ca717e9d68bb177b7eded71091' + + +# 自定义消息模板(可选) +DINGTALK_START_MESSAGE = "🚀 爬虫启动啦!\n**爬虫**: {spider_name}\n**时间**: {time}" +DINGTALK_CLOSED_MESSAGE = "✅ 爬虫完成!\n**爬虫**: {spider_name}\n**项目数**: {item_count}" + +# 启用扩展 +EXTENSIONS = { + 'scrapy_example.extensions.dingtalk_extension.DingTalkExtension': 500, + # 'scrapy_example.extensions.advanced_dingtalk_extension.AdvancedDingTalkExtension': 100, +} + +# 启用/禁用特定通知 +DINGTALK_ENABLE_START = True +DINGTALK_ENABLE_FINISH = True +DINGTALK_ENABLE_ERROR = True + diff --git a/science_article_add/science_article_add/spiders/wos_latest_increment.py b/science_article_add/science_article_add/spiders/wos_latest_increment.py index 0f61b36..e058691 100644 --- a/science_article_add/science_article_add/spiders/wos_latest_increment.py +++ b/science_article_add/science_article_add/spiders/wos_latest_increment.py @@ -1,9 +1,12 @@ import math +from datetime import datetime from urllib.parse import urlencode from copy import deepcopy + import scrapy +from scrapy.http.response.json import JsonResponse -from science_article_add.items import WosLiteAddItem +from science_article_add.items.wos import WosCitedNumberItem from science_article_add.models import wos_model as model from science_article_add.configs import wos as config from science_article_add.utils import tools @@ -36,103 +39,53 @@ class WosLatestIncrementSpider(scrapy.Spider): self.query_content = task_obj['content'] self.query_condition = task_obj['task_condition'] + self.first_page = task_obj.get('first_page', 1) + async def start(self): full_query = self.query_content if self.query_condition is not None: full_query = '%(query)s %(condition)s' % {'query': self.query_content, 'condition': self.query_condition} - yield scrapy.Request(url=config.WOS_LITE_QUERY_FIRST_API + '?' + urlencode(model.lite_base_model(usr_query=full_query)), - dont_filter=True, - meta={'query': full_query, 'PAGE': 1}) + meta = dict(q=full_query, page=self.first_page, limit=50, detail="short") + yield scrapy.Request(url=config.WOS_STARTER_DOCUMENT_API + '?' + urlencode(model.starter_documents_get(**meta)), + meta=meta) - async def parse(self, response, **kwargs): + async def parse(self, response: JsonResponse, **kwargs): meta = response.meta - request = response.request + request: scrapy.Request = response.request task_query_id = self.query_id task_org_id = self.org_id task_record_id = self.record_id self.logger.debug('%s: %s' % ('parse_query_api', meta)) + if response.status != 200: + self.logger.warning(""" + 响应异常 + 状态码: %s + 响应内容: %s""" % (response.status, response.text)) + req_meta = request.meta resp_result = response.json() - - query_result = resp_result.get('QueryResult') - datas = resp_result.get('Data') - - query_id = query_result.get('QueryID') - records_found = query_result.get('RecordsFound') - max_page = math.ceil(records_found / 100) - meta_copy: dict = deepcopy(meta) - meta_copy.update({'MAX_PAGE': max_page}) - meta_copy.update({'TOTAL': records_found}) - meta_copy.update({'QUERY_ID': query_id}) - meta_copy.update({'next_page': meta['PAGE'] + 1}) - meta_copy.update({'PAGE': meta['PAGE'] + 1}) - meta_copy.update({'first_record': calculate_next_page(meta_copy['next_page'])}) - - for data in datas: - add_item = WosLiteAddItem() - # 入库年份优先按照自己指定的 - to_db_year = meta.get("search_year") - if not to_db_year: - publish_year = data.get("Source", {}).get("Published.BiblioYear", []) - if publish_year: - to_db_year = tools.str2int(publish_year[0]) - add_item["third_id"] = data.get('UT') - add_item["year"] = to_db_year - add_item["query_ids"] = [task_query_id] - add_item["school_ids"] = [task_org_id] - add_item["task_ids"] = [task_record_id] - yield add_item - - yield scrapy.Request( - url=config.WOS_LITE_QUERY_API + f'/{query_id}', - body=model.lite_query_model(**meta_copy), - meta=meta_copy, - callback=self.parse_query_api, - ) - - async def parse_query_api(self, response, **kwargs): - meta = response.meta - task_query_id = self.query_id - task_org_id = self.org_id - task_record_id = self.record_id - self.logger.debug(""" - %s - %s""" % ('parse_query_api', meta)) - - resp_result = response.json() - query_id = meta.get('QUERY_ID') - - datas = resp_result.get('Data', []) - if len(datas): - for data in datas: - add_item = WosLiteAddItem() - # 入库年份优先按照自己指定的 - to_db_year = meta.get("search_year") - if not to_db_year: - publish_year = data.get("Source", {}).get("Published.BiblioYear", []) - if publish_year: - to_db_year = tools.str2int(publish_year[0]) - add_item["third_id"] = data.get('UT') - add_item["year"] = to_db_year - add_item["query_ids"] = [task_query_id] - add_item["school_ids"] = [task_org_id] - add_item["task_ids"] = [task_record_id] - yield add_item - else: - # 根据条件记录生成sql记录结果 - update_state_condition = ('batch_date = "%(batch_date)s"\n' - 'query_id = %(query_id)s\n' - 'task_condition = "%(task_condition)s"') % self.task_obj - self.logger.warning('没有数据了\n%s' % update_state_condition) - return - if meta['first_record'] < meta['TOTAL']: - meta_copy = deepcopy(meta) - meta_copy.update({'next_page': meta['PAGE'] + 1}) - meta_copy.update({'PAGE': meta['PAGE'] + 1}) - meta_copy.update({'first_record': calculate_next_page(meta_copy['next_page'])}) - yield scrapy.Request( - url=config.WOS_LITE_QUERY_API + f'/{query_id}', - body=model.lite_query_model(**meta_copy), - meta=meta_copy, - callback=self.parse_query_api, - ) + metadata: dict = resp_result.get("metadata") + current_page = metadata.get("page") + records_found = metadata.get('total') + + max_page = req_meta.get("MAX_PAGE") + if req_meta.get("page") == self.first_page: + self.logger.info(""" + 检索式: %s + 检索到结果: %s""" % (req_meta.get("q"), records_found)) + max_page = req_meta["MAX_PAGE"] = math.ceil(records_found / config.WOS_STARTER_PER_PAGE_LIMIT) + batch_time = datetime.now() + hits: list = resp_result.get("hits") + for record in hits: + cited_num = tools.get_list_key(array=record.get("citations"), target="count", condition=("db", "WOS")) + if cited_num: + cited_item = WosCitedNumberItem(third_id=record.get("uid"), cited=cited_num, updated_at=batch_time) + yield cited_item + yield WosIdRelationItem(third_id=record.get("uid"), query_ids=[task_query_id], updated_at=batch_time) + + if current_page < max_page: + meta_copy: dict = deepcopy(req_meta) + meta_copy.update({'page': meta_copy['page'] + 1}) + yield scrapy.Request(config.WOS_STARTER_DOCUMENT_API + '?' + urlencode(model.starter_documents_get(**meta_copy)), + meta=meta_copy, + task_query_id=request.task_query_id) diff --git a/science_article_add/science_article_add/utils/tools.py b/science_article_add/science_article_add/utils/tools.py index 599b300..d379d26 100644 --- a/science_article_add/science_article_add/utils/tools.py +++ b/science_article_add/science_article_add/utils/tools.py @@ -1,3 +1,6 @@ +from typing import List, Tuple + + def str2int(val, replace=0): try: val = int(val) @@ -5,4 +8,20 @@ def str2int(val, replace=0): val = replace except TypeError: val = replace - return val \ No newline at end of file + return val + + +def get_list_key(array: List[dict], target: str, condition: Tuple[str, str]): + """ + 给定一个list [{key: val1, target: val2}, {key: val1, target: val2}] + 根据condition(key=val)返回第一个target对应的值 + :param target: + :param condition: + :param array: + :return: + """ + n, v = condition + for dic in array: + if dic.get(n) == v: + return dic.get(target) +