# -*- coding: utf-8 -*- # @Time : 2026/1/14 13:59 # @Author : zhaoxiangpeng # @File : crawl_article_latest.py import time from typing import List import pymysql from pymysql import cursors from twisted.internet import defer from scrapy.crawler import CrawlerProcess from scrapy.utils.project import get_project_settings from science_article_wos.spiders.wos_latest_increment import WosLatestIncrementSpider from science_article_wos.utils import tools CREATE_RECORD_SQL = '''insert into task_batch_record (batch_date, query_id, task_condition) VALUES ("%(batch_date)s", %(query_id)s, %(task_condition)s)''' SELECT_RECORD_SQL = """ SELECT b.id AS task_id, q.id AS query_id, q.content AS content, b.task_condition AS task_condition, q.source_type AS source_type, b.is_done AS is_done FROM task_batch_record AS b JOIN task_search_strategy AS q ON q.id = b.query_id WHERE b.is_done = 0 AND q.source_type = 1 LIMIT %(limit)s """ def get_connect() -> pymysql.Connection: conn: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306, database='science_data_dept', user='science-data-dept', passwd='datadept1509', ) return conn def get_strategy_id_by_name(name: str, source_type: int = 1) -> int: conn = get_connect() cursor = conn.cursor(cursors.DictCursor) cursor.execute('''SELECT r.org_id, q.id AS query_id, q.content, q.param, q.interval_unit, q.disable_flag, q.state FROM relation_org_query AS r JOIN task_search_strategy AS q ON r.query_id = q.id WHERE r.org_name="%(org_name)s" AND source_type = %(source_type)s AND disable_flag = 0''' % {'org_name': name, 'source_type': source_type}) r = cursor.fetchone() return r.get('query_id') def starter_latest_all(): @defer.inlineCallbacks def f(running: bool = True): while running: client: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306, database='science_data_dept', user='science-data-dept', passwd='datadept1509', ) cursor = client.cursor(cursors.DictCursor) # 从检索策略表插需要做的任务 cursor.execute('select id from task_search_strategy where disable_flag=0 and source_type=1 and state=0 limit 1') search_strategy_r = cursor.fetchone() if not search_strategy_r: running = False cursor.close() client.close() # 把任务加到批次表 strategy_id = search_strategy_r.get('id') insert_sql = CREATE_RECORD_SQL % { 'batch_date': tools.get_today_date(), 'query_id': strategy_id, 'task_condition': '"AND PY=(2025-2026)"' } cursor.execute(insert_sql) # 添加后标记为1 cursor.execute('update task_search_strategy set state=1, updated_time=CURRENT_TIMESTAMP where id=%s', (strategy_id,)) cursor.connection.commit() # 从记录表取任务执行 cursor.execute(SELECT_RECORD_SQL % {'limit': 1}) result = cursor.fetchone() if result: query_id = result['query_id'] cursor.execute('select org_id, org_name from relation_org_query where query_id=%s', (query_id,)) org_results: List[dict] = cursor.fetchall() result['org_id'] = [org_result['org_id'] for org_result in org_results] result['org_name'] = [org_result['org_name'] for org_result in org_results] init_params = result cursor.close() client.close() yield process.crawl(WosLatestIncrementSpider, task_obj=init_params) time.sleep(60) else: running = False cursor.close() client.close() process = CrawlerProcess(get_project_settings()) f(True) process.start() process.stop() def starter_latest_one(strategy_id: int = None, org_name: str = None): if strategy_id is None: if org_name is None: raise Exception('指定 strategy_id 或者 org_name') strategy_id = get_strategy_id_by_name(org_name) def starter(): process = CrawlerProcess(get_project_settings()) process.crawl(WosLatestIncrementSpider) process.start() if __name__ == '__main__': starter_latest_all()