diff --git a/science_article_add/science_article_add/scripts/crawl_task.py b/science_article_add/science_article_add/scripts/crawl_task.py new file mode 100644 index 0000000..4b2efd2 --- /dev/null +++ b/science_article_add/science_article_add/scripts/crawl_task.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +# @Time : 2025/10/31 10:24 +# @Author : zhaoxiangpeng +# @File : distribute_task.py + +from typing import Any +import pymysql +from science_article_add.utils import tools + +SELECT_STRATEGY_SQL = '''SELECT +r.org_id, q.id, q.content, q.param, q.disable_flag, q.state +FROM relation_org_query AS r JOIN task_search_strategy AS q +ON r.query_id = q.id +WHERE +r.org_name="%(org_name)s" AND disable_flag = 0''' +CREATE_RECORD_SQL = '''insert into task_batch_record (batch_date, query_id, task_condition) VALUES ("%(batch_date)s", %(query_id)s, %(task_condition)s)''' + +ORG_STRATEGY_SQL = """ +SELECT r.%(org_id)s, r.%(org_name)s, r.%(query_id)s, q.%(content)s, q.%(source_type)s +FROM task_search_strategy AS q JOIN relation_org_query AS r ON r.query_id = q.id +WHERE q.id = %(q_id)s +""" +ORG_STRATEGY_FIELDS = ['org_id', 'org_name', 'query_id', 'content', 'source_type'] + + +class CrawlTaskManager: + def __init__(self): + self.client: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306, + database='science_data_dept', user='science-data-dept', + passwd='datadept1509', ) + + def execute_sql(self, sql): + cursor = self.client.cursor() + try: + cursor.execute(sql) + results = cursor.fetchall() + return results + except Exception as e: + raise e + finally: + cursor.close() + + def find_task_by_school_name(self, school_name, source_type: int = None): + cursor = self.client.cursor() + try: + # 查询 + select_fields = ['org_id', 'id', 'content', 'disable_flag', 'state'] + select_sql = 'select %()s from task_search_strategy as q join relation_org_query as r ON q.id = r.query_id where q.source_type = %(source_type)s' + cursor.execute( + select_sql + ) + find_result = cursor.fetchall() + + except pymysql.MySQLError as e: + pass + + def create_crawler_task(self, query_id: int, condition: Any = None, source_type: int = None): + cursor = self.client.cursor() + try: + insert_sql = CREATE_RECORD_SQL % { + 'batch_date': tools.get_today_date(), + 'query_id': query_id, + 'task_condition': condition + } + cursor.execute( + insert_sql + ) + cursor.connection.commit() + return cursor.lastrowid + except pymysql.MySQLError as e: + print(e) + return None + finally: + cursor.close() + + def get_crawler_task(self, task_id: int = None, source_type: int = None, state: int = None): + STRATEGY_FIELDS = ['org_id', 'org_name', 'query_id', 'content', 'source_type'] + cursor = self.client.cursor() + try: + record_fields = ['id', 'batch_date', 'query_id', 'task_condition', 'is_done'] + condition = {} + if state is not None: + condition['is_done'] = state + else: + condition['is_done'] = 0 + if task_id: + condition['id'] = task_id + + sql = "select %(fields)s from task_batch_record where %(condition)s" % { + 'fields': ', '.join(record_fields), 'condition': ' and '.join([f'{k}={v}' for k, v in condition.items()]) + } + if source_type: + pass + cursor.execute(sql) + result = cursor.fetchone() + if result is None: + return + task_record_dic = dict(zip(record_fields, result)) + fill = dict(zip(STRATEGY_FIELDS, STRATEGY_FIELDS)) + fill.update(q_id=task_record_dic.get("query_id")) + cursor.execute( + ORG_STRATEGY_SQL % fill, + ) + result = cursor.fetchone() + task_dic = dict(zip(STRATEGY_FIELDS, result)) + task_dic.update(task_record_dic) + return task_dic + finally: + cursor.close() + + def _build_condition(self, source_type: int = None): + if source_type is None: + source_type = 1 + if source_type == 1: + condition = 'AND PY=()' + + +# def test_create_one(): +# manager = CrawlTaskManager() +# manager.create_crawler_task(1542, condition='NULL', source_type=1) + + +def main(): + manager = CrawlTaskManager() + rr = manager.execute_sql('select id from task_search_strategy where disable_flag=0 and source_type=1 and state=0 limit 20') + # rr = manager.execute_sql('select id from task_search_strategy where disable_flag=0 and source_type=1 and id in (1124, 1148, 1159, 1162, 1163, 1164, 1534, 1535)') + query_ids = [] + for c in rr: + record_id = manager.create_crawler_task(c[0], condition='"AND PY=(2025-2026)"', source_type=1) + query_ids.append(c[0]) + print(record_id) + changed = [str(i) for i in query_ids] + print(changed) + ok = 'update task_search_strategy set state=1 where id in (%s)' % ', '.join(changed) + print(ok) + + +if __name__ == '__main__': + main() \ No newline at end of file