wos:从策略表拉任务

main
zhaoxiangpeng 3 weeks ago
parent 3025d07403
commit 3e3d237388

@ -3,7 +3,7 @@
# @Author : zhaoxiangpeng # @Author : zhaoxiangpeng
# @File : crawl_article_latest.py # @File : crawl_article_latest.py
import math import time
from typing import List from typing import List
import pymysql import pymysql
from pymysql import cursors from pymysql import cursors
@ -11,6 +11,7 @@ from twisted.internet import defer
from scrapy.crawler import CrawlerProcess from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings from scrapy.utils.project import get_project_settings
from science_article_wos.spiders.wos_latest_increment import WosLatestIncrementSpider from science_article_wos.spiders.wos_latest_increment import WosLatestIncrementSpider
from science_article_wos.utils import tools
CREATE_RECORD_SQL = '''insert into task_batch_record (batch_date, query_id, task_condition) VALUES ("%(batch_date)s", %(query_id)s, %(task_condition)s)''' CREATE_RECORD_SQL = '''insert into task_batch_record (batch_date, query_id, task_condition) VALUES ("%(batch_date)s", %(query_id)s, %(task_condition)s)'''
SELECT_RECORD_SQL = """ SELECT_RECORD_SQL = """
@ -31,15 +32,67 @@ WHERE
""" """
def get_connect() -> pymysql.Connection:
conn: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306,
database='science_data_dept', user='science-data-dept',
passwd='datadept1509', )
return conn
def get_strategy_id_by_name(name: str, source_type: int = 1) -> int:
conn = get_connect()
cursor = conn.cursor(cursors.DictCursor)
cursor.execute('''SELECT
r.org_id,
q.id AS query_id,
q.content,
q.param,
q.interval_unit,
q.disable_flag,
q.state
FROM
relation_org_query AS r
JOIN task_search_strategy AS q ON r.query_id = q.id
WHERE
r.org_name="%(org_name)s"
AND source_type = %(source_type)s
AND disable_flag = 0''' % {'org_name': name, 'source_type': source_type})
r = cursor.fetchone()
return r.get('query_id')
def starter_latest_all(): def starter_latest_all():
@defer.inlineCallbacks @defer.inlineCallbacks
def f(): def f(running: bool = True):
while running:
client: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306, client: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306,
database='science_data_dept', user='science-data-dept', database='science_data_dept', user='science-data-dept',
passwd='datadept1509', ) passwd='datadept1509', )
cursor = client.cursor(cursors.DictCursor) cursor = client.cursor(cursors.DictCursor)
# 从检索策略表插需要做的任务
cursor.execute('select id from task_search_strategy where disable_flag=0 and source_type=1 and state=0 limit 1')
search_strategy_r = cursor.fetchone()
if not search_strategy_r:
running = False
cursor.close()
client.close()
# 把任务加到批次表
strategy_id = search_strategy_r.get('id')
insert_sql = CREATE_RECORD_SQL % {
'batch_date': tools.get_today_date(),
'query_id': strategy_id,
'task_condition': '"AND PY=(2025-2026)"'
}
cursor.execute(insert_sql)
# 添加后标记为1
cursor.execute('update task_search_strategy set state=1, updated_time=CURRENT_TIMESTAMP where id=%s', (strategy_id,))
cursor.connection.commit()
# 从记录表取任务执行
cursor.execute(SELECT_RECORD_SQL % {'limit': 1}) cursor.execute(SELECT_RECORD_SQL % {'limit': 1})
result = cursor.fetchone() result = cursor.fetchone()
if result:
query_id = result['query_id'] query_id = result['query_id']
cursor.execute('select org_id, org_name from relation_org_query where query_id=%s', (query_id,)) cursor.execute('select org_id, org_name from relation_org_query where query_id=%s', (query_id,))
org_results: List[dict] = cursor.fetchall() org_results: List[dict] = cursor.fetchall()
@ -47,14 +100,30 @@ def starter_latest_all():
result['org_name'] = [org_result['org_name'] for org_result in org_results] result['org_name'] = [org_result['org_name'] for org_result in org_results]
init_params = result init_params = result
cursor.close()
client.close()
yield process.crawl(WosLatestIncrementSpider, task_obj=init_params) yield process.crawl(WosLatestIncrementSpider, task_obj=init_params)
time.sleep(60)
else:
running = False
cursor.close()
client.close()
process = CrawlerProcess(get_project_settings()) process = CrawlerProcess(get_project_settings())
f() f(True)
process.start() process.start()
process.stop() process.stop()
def starter_latest_one(strategy_id: int = None, org_name: str = None):
if strategy_id is None:
if org_name is None:
raise Exception('指定 strategy_id 或者 org_name')
strategy_id = get_strategy_id_by_name(org_name)
def starter(): def starter():
process = CrawlerProcess(get_project_settings()) process = CrawlerProcess(get_project_settings())
process.crawl(WosLatestIncrementSpider) process.crawl(WosLatestIncrementSpider)

Loading…
Cancel
Save