wos:生产任务
parent
1b0da2c41e
commit
b47caf1c59
@ -0,0 +1,139 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2025/10/31 10:24
|
||||
# @Author : zhaoxiangpeng
|
||||
# @File : distribute_task.py
|
||||
|
||||
from typing import Any
|
||||
import pymysql
|
||||
from science_article_add.utils import tools
|
||||
|
||||
SELECT_STRATEGY_SQL = '''SELECT
|
||||
r.org_id, q.id, q.content, q.param, q.disable_flag, q.state
|
||||
FROM relation_org_query AS r JOIN task_search_strategy AS q
|
||||
ON r.query_id = q.id
|
||||
WHERE
|
||||
r.org_name="%(org_name)s" AND disable_flag = 0'''
|
||||
CREATE_RECORD_SQL = '''insert into task_batch_record (batch_date, query_id, task_condition) VALUES ("%(batch_date)s", %(query_id)s, %(task_condition)s)'''
|
||||
|
||||
ORG_STRATEGY_SQL = """
|
||||
SELECT r.%(org_id)s, r.%(org_name)s, r.%(query_id)s, q.%(content)s, q.%(source_type)s
|
||||
FROM task_search_strategy AS q JOIN relation_org_query AS r ON r.query_id = q.id
|
||||
WHERE q.id = %(q_id)s
|
||||
"""
|
||||
ORG_STRATEGY_FIELDS = ['org_id', 'org_name', 'query_id', 'content', 'source_type']
|
||||
|
||||
|
||||
class CrawlTaskManager:
|
||||
def __init__(self):
|
||||
self.client: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306,
|
||||
database='science_data_dept', user='science-data-dept',
|
||||
passwd='datadept1509', )
|
||||
|
||||
def execute_sql(self, sql):
|
||||
cursor = self.client.cursor()
|
||||
try:
|
||||
cursor.execute(sql)
|
||||
results = cursor.fetchall()
|
||||
return results
|
||||
except Exception as e:
|
||||
raise e
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
def find_task_by_school_name(self, school_name, source_type: int = None):
|
||||
cursor = self.client.cursor()
|
||||
try:
|
||||
# 查询
|
||||
select_fields = ['org_id', 'id', 'content', 'disable_flag', 'state']
|
||||
select_sql = 'select %()s from task_search_strategy as q join relation_org_query as r ON q.id = r.query_id where q.source_type = %(source_type)s'
|
||||
cursor.execute(
|
||||
select_sql
|
||||
)
|
||||
find_result = cursor.fetchall()
|
||||
|
||||
except pymysql.MySQLError as e:
|
||||
pass
|
||||
|
||||
def create_crawler_task(self, query_id: int, condition: Any = None, source_type: int = None):
|
||||
cursor = self.client.cursor()
|
||||
try:
|
||||
insert_sql = CREATE_RECORD_SQL % {
|
||||
'batch_date': tools.get_today_date(),
|
||||
'query_id': query_id,
|
||||
'task_condition': condition
|
||||
}
|
||||
cursor.execute(
|
||||
insert_sql
|
||||
)
|
||||
cursor.connection.commit()
|
||||
return cursor.lastrowid
|
||||
except pymysql.MySQLError as e:
|
||||
print(e)
|
||||
return None
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
def get_crawler_task(self, task_id: int = None, source_type: int = None, state: int = None):
|
||||
STRATEGY_FIELDS = ['org_id', 'org_name', 'query_id', 'content', 'source_type']
|
||||
cursor = self.client.cursor()
|
||||
try:
|
||||
record_fields = ['id', 'batch_date', 'query_id', 'task_condition', 'is_done']
|
||||
condition = {}
|
||||
if state is not None:
|
||||
condition['is_done'] = state
|
||||
else:
|
||||
condition['is_done'] = 0
|
||||
if task_id:
|
||||
condition['id'] = task_id
|
||||
|
||||
sql = "select %(fields)s from task_batch_record where %(condition)s" % {
|
||||
'fields': ', '.join(record_fields), 'condition': ' and '.join([f'{k}={v}' for k, v in condition.items()])
|
||||
}
|
||||
if source_type:
|
||||
pass
|
||||
cursor.execute(sql)
|
||||
result = cursor.fetchone()
|
||||
if result is None:
|
||||
return
|
||||
task_record_dic = dict(zip(record_fields, result))
|
||||
fill = dict(zip(STRATEGY_FIELDS, STRATEGY_FIELDS))
|
||||
fill.update(q_id=task_record_dic.get("query_id"))
|
||||
cursor.execute(
|
||||
ORG_STRATEGY_SQL % fill,
|
||||
)
|
||||
result = cursor.fetchone()
|
||||
task_dic = dict(zip(STRATEGY_FIELDS, result))
|
||||
task_dic.update(task_record_dic)
|
||||
return task_dic
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
def _build_condition(self, source_type: int = None):
|
||||
if source_type is None:
|
||||
source_type = 1
|
||||
if source_type == 1:
|
||||
condition = 'AND PY=()'
|
||||
|
||||
|
||||
# def test_create_one():
|
||||
# manager = CrawlTaskManager()
|
||||
# manager.create_crawler_task(1542, condition='NULL', source_type=1)
|
||||
|
||||
|
||||
def main():
|
||||
manager = CrawlTaskManager()
|
||||
rr = manager.execute_sql('select id from task_search_strategy where disable_flag=0 and source_type=1 and state=0 limit 20')
|
||||
# rr = manager.execute_sql('select id from task_search_strategy where disable_flag=0 and source_type=1 and id in (1124, 1148, 1159, 1162, 1163, 1164, 1534, 1535)')
|
||||
query_ids = []
|
||||
for c in rr:
|
||||
record_id = manager.create_crawler_task(c[0], condition='"AND PY=(2025-2026)"', source_type=1)
|
||||
query_ids.append(c[0])
|
||||
print(record_id)
|
||||
changed = [str(i) for i in query_ids]
|
||||
print(changed)
|
||||
ok = 'update task_search_strategy set state=1 where id in (%s)' % ', '.join(changed)
|
||||
print(ok)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Loading…
Reference in New Issue