add:任务完成mysql确认

main
zhaoxiangpeng 2 months ago
parent ea68319ee6
commit 129ab6569d

@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
# @Time : 2025/10/27 17:12
# @Author : zhaoxiangpeng
# @File : ackextension.py
import logging
import pymysql
from scrapy import signals
from scrapy.crawler import Crawler
logger = logging.getLogger(__name__)
class SpiderProtocol:
name: str
record_id: int
org_id: int
org_name: str
query_id: int
query_content: str
def get_records_found(self) -> int: ...
class ACKExtension:
def __init__(self, crawler: Crawler):
self.crawler = crawler
self.change_state_sql = 'update task_batch_record set %(update_kws)s where id=%(record_id)s'
@classmethod
def from_crawler(cls, crawler):
ext = cls(crawler=crawler)
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_error, signal=signals.spider_error)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
return ext
def spider_opened(self, spider):
kws = {
'is_done': 2,
}
sql = self.change_state_sql % {
'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]),
'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id}
}
self._execute_sql(sql)
def spider_closed(self, spider: SpiderProtocol):
"""
# 修改任务状态
# 通知
"""
kws = {
'is_done': 1,
'result_count': spider.get_records_found(),
'updated_time': 'CURRENT_TIMESTAMP'
}
sql = self.change_state_sql % {
'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]),
'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id}
}
self._execute_sql(sql)
def spider_error(self, spider: SpiderProtocol):
kws = {
'is_done': -1,
'updated_time': 'CURRENT_TIMESTAMP'
}
sql = self.change_state_sql % {
'update_kws': ', '.join([f'{k}={v}' for k, v in kws.items()]),
'update_cond': 'id=%(record_id)s' % {'record_id': spider.record_id}
}
self._execute_sql(sql)
def _execute_sql(self, sql):
settings = self.crawler.settings
client = pymysql.connect(
host=settings.get('MYSQL_HOST'),
port=settings.get('MYSQL_PORT', 3306),
database=settings.get('MYSQL_DATABASE'),
user=settings.get('MYSQL_USER'),
passwd=settings.get('MYSQL_PASSWORD'),
)
try:
cursor = client.cursor()
cursor.execute(sql)
cursor.connection.commit()
logger.info(f'Execute SQL: {sql}')
except Exception as e:
logger.exception(e)
finally:
client.close()
Loading…
Cancel
Save