diff --git a/science_article_add/science_article_add/extensions/ackextension.py b/science_article_add/science_article_add/extensions/ackextension.py new file mode 100644 index 0000000..3d52e85 --- /dev/null +++ b/science_article_add/science_article_add/extensions/ackextension.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +# @Time : 2025/10/27 17:12 +# @Author : zhaoxiangpeng +# @File : ackextension.py +import logging +import pymysql +from scrapy import signals +from scrapy.crawler import Crawler + +logger = logging.getLogger(__name__) + + +class SpiderProtocol: + name: str + record_id: int + org_id: int + org_name: str + query_id: int + query_content: str + + def get_records_found(self) -> int: ... + + +class ACKExtension: + def __init__(self, crawler: Crawler): + self.crawler = crawler + self.change_state_sql = 'update task_batch_record set %(update_kws)s where id=%(record_id)s' + + @classmethod + def from_crawler(cls, crawler): + ext = cls(crawler=crawler) + crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) + crawler.signals.connect(ext.spider_error, signal=signals.spider_error) + crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) + return ext + + def spider_opened(self, spider): + kws = { + 'is_done': 2, + } + sql = self.change_state_sql % { + 'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]), + 'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id} + } + self._execute_sql(sql) + + def spider_closed(self, spider: SpiderProtocol): + """ + # 修改任务状态 + # 通知 + """ + kws = { + 'is_done': 1, + 'result_count': spider.get_records_found(), + 'updated_time': 'CURRENT_TIMESTAMP' + } + sql = self.change_state_sql % { + 'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]), + 'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id} + } + self._execute_sql(sql) + + def spider_error(self, spider: SpiderProtocol): + kws = { + 'is_done': -1, + 'updated_time': 'CURRENT_TIMESTAMP' + } + sql = self.change_state_sql % { + 'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]), + 'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id} + } + self._execute_sql(sql) + + def _execute_sql(self, sql): + settings = self.crawler.settings + client = pymysql.connect( + host=settings.get('MYSQL_HOST'), + port=settings.get('MYSQL_PORT', 3306), + database=settings.get('MYSQL_DATABASE'), + user=settings.get('MYSQL_USER'), + passwd=settings.get('MYSQL_PASSWORD'), + ) + try: + cursor = client.cursor() + cursor.execute(sql) + cursor.connection.commit() + logger.info(f'Execute SQL: {sql}') + except Exception as e: + logger.exception(e) + finally: + client.close()