diff --git a/science_article_wos/starter/crawl_article_latest.py b/science_article_wos/starter/crawl_article_latest.py index 930b2c9..960d91d 100644 --- a/science_article_wos/starter/crawl_article_latest.py +++ b/science_article_wos/starter/crawl_article_latest.py @@ -3,7 +3,7 @@ # @Author : zhaoxiangpeng # @File : crawl_article_latest.py -import math +import time from typing import List import pymysql from pymysql import cursors @@ -11,6 +11,7 @@ from twisted.internet import defer from scrapy.crawler import CrawlerProcess from scrapy.utils.project import get_project_settings from science_article_wos.spiders.wos_latest_increment import WosLatestIncrementSpider +from science_article_wos.utils import tools CREATE_RECORD_SQL = '''insert into task_batch_record (batch_date, query_id, task_condition) VALUES ("%(batch_date)s", %(query_id)s, %(task_condition)s)''' SELECT_RECORD_SQL = """ @@ -31,30 +32,98 @@ WHERE """ +def get_connect() -> pymysql.Connection: + conn: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306, + database='science_data_dept', user='science-data-dept', + passwd='datadept1509', ) + return conn + + +def get_strategy_id_by_name(name: str, source_type: int = 1) -> int: + conn = get_connect() + cursor = conn.cursor(cursors.DictCursor) + cursor.execute('''SELECT + r.org_id, + q.id AS query_id, + q.content, + q.param, + q.interval_unit, + q.disable_flag, + q.state +FROM + relation_org_query AS r + JOIN task_search_strategy AS q ON r.query_id = q.id +WHERE + r.org_name="%(org_name)s" + AND source_type = %(source_type)s + AND disable_flag = 0''' % {'org_name': name, 'source_type': source_type}) + r = cursor.fetchone() + return r.get('query_id') + + def starter_latest_all(): @defer.inlineCallbacks - def f(): - client: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306, - database='science_data_dept', user='science-data-dept', - passwd='datadept1509', ) - cursor = client.cursor(cursors.DictCursor) - cursor.execute(SELECT_RECORD_SQL % {'limit': 1}) - result = cursor.fetchone() - query_id = result['query_id'] - cursor.execute('select org_id, org_name from relation_org_query where query_id=%s', (query_id,)) - org_results: List[dict] = cursor.fetchall() - result['org_id'] = [org_result['org_id'] for org_result in org_results] - result['org_name'] = [org_result['org_name'] for org_result in org_results] - - init_params = result - yield process.crawl(WosLatestIncrementSpider, task_obj=init_params) + def f(running: bool = True): + while running: + client: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306, + database='science_data_dept', user='science-data-dept', + passwd='datadept1509', ) + cursor = client.cursor(cursors.DictCursor) + + # 从检索策略表插需要做的任务 + cursor.execute('select id from task_search_strategy where disable_flag=0 and source_type=1 and state=0 limit 1') + search_strategy_r = cursor.fetchone() + if not search_strategy_r: + running = False + cursor.close() + client.close() + # 把任务加到批次表 + strategy_id = search_strategy_r.get('id') + insert_sql = CREATE_RECORD_SQL % { + 'batch_date': tools.get_today_date(), + 'query_id': strategy_id, + 'task_condition': '"AND PY=(2025-2026)"' + } + cursor.execute(insert_sql) + # 添加后标记为1 + cursor.execute('update task_search_strategy set state=1, updated_time=CURRENT_TIMESTAMP where id=%s', (strategy_id,)) + cursor.connection.commit() + + # 从记录表取任务执行 + cursor.execute(SELECT_RECORD_SQL % {'limit': 1}) + result = cursor.fetchone() + if result: + query_id = result['query_id'] + cursor.execute('select org_id, org_name from relation_org_query where query_id=%s', (query_id,)) + org_results: List[dict] = cursor.fetchall() + result['org_id'] = [org_result['org_id'] for org_result in org_results] + result['org_name'] = [org_result['org_name'] for org_result in org_results] + + init_params = result + cursor.close() + client.close() + + yield process.crawl(WosLatestIncrementSpider, task_obj=init_params) + time.sleep(60) + + else: + running = False + cursor.close() + client.close() process = CrawlerProcess(get_project_settings()) - f() + f(True) process.start() process.stop() +def starter_latest_one(strategy_id: int = None, org_name: str = None): + if strategy_id is None: + if org_name is None: + raise Exception('指定 strategy_id 或者 org_name') + strategy_id = get_strategy_id_by_name(org_name) + + def starter(): process = CrawlerProcess(get_project_settings()) process.crawl(WosLatestIncrementSpider)