diff --git a/science_article_add/science_article_add/spiders/wos_download.py b/science_article_add/science_article_add/spiders/wos_download.py new file mode 100644 index 0000000..e98aebe --- /dev/null +++ b/science_article_add/science_article_add/spiders/wos_download.py @@ -0,0 +1,149 @@ +import os +import json +from datetime import datetime +from typing import List, Dict, Union, Any, Self +import scrapy +from scrapy.http.request.json_request import JsonRequest +from scrapy.crawler import Crawler + +from science_article_add.items.wos import WosArticleItem, WosCitedNumberItem +from science_article_add.scripts.wos_parse_data import parse_full_records_txt +from science_article_add.models import wos_model as model +from science_article_add.utils import tools +from science_article_add.configs import wos as config + + +def _parse_download(body: Union[bytes, str]): + """ + 解析响应的下载内容 + """ + batch_time = datetime.now() + if isinstance(body, str): + body = body.encode() + item_g = parse_full_records_txt(body) + parse_count = 0 + for data_dic in item_g: + t_id = data_dic.pop('ut', None) + if t_id: + parse_count += 1 + article_item = WosArticleItem() + article_item['third_id'] = t_id + article_item['exported'] = data_dic + article_item['updated_at'] = batch_time + yield article_item + # 解析被引量 + if cited_num := tools.str2int(data_dic.get("tc", 0), 0): + cited_item = WosCitedNumberItem() + cited_item['third_id'] = t_id + cited_item['cited'] = cited_num + cited_item['updated_at'] = batch_time + yield cited_item + + +class WosDownloadSpider(scrapy.Spider): + name = "wos_download" + custom_settings = dict( + FILE_STORAGE_DIR=r"Y:\wos-metadata\wos increment-202512\03", + DOWNLOADER_MIDDLEWARES={ + "science_article_add.middlewares.wos.WosSidParamMiddleware": 500 + }, + ITEM_PIPELINES={ + "science_article_add.pipelines.mongo.MongoPipeline": 300, + "science_article_add.pipelines.verify_data.VerifyDataIntegrity": 400, + }, + LOG_LEVEL="INFO" + ) + + def __init__(self, task_obj, file_storage_dir: str = None, **kwargs): + scrapy.Spider.__init__(self) + self.file_storage_dir = file_storage_dir + self.id_list: List[Dict[str, str]] = task_obj + self._records_found = 0 + + @classmethod + def from_crawler(cls, crawler: Crawler, *args: Any, **kwargs: Any) -> Self: + settings = crawler.settings + from pymongo import MongoClient + client = MongoClient(settings.get("MONGO_URI")) + db = client.get_database(settings.get("MONGO_DATABASE")) + collection = db.get_collection("todo_ids_wos") + + def f(): + cursor = collection.find(filter={"state": 0}, projection={"state": 0}).limit(500) + d = [c for c in cursor] + if not d: + cursor = collection.find(filter={"state": 2}, projection={"_id": 0, "state": 0}).limit(500) + d = [c for c in cursor] + else: + _ids = [x.pop("_id", None) for x in d] + collection.update_many(filter={"_id": {"$in": _ids}}, update={"$set": {"state": 2}}) + return d + + tasks = f() + kwargs.update({"task_obj": tasks}) + kwargs['file_storage_dir'] = settings.get("FILE_STORAGE_DIR") + return super().from_crawler(crawler, *args, **kwargs) + + def make_query(self) -> str: + third_ids = [] + for idT in self.id_list: + third_ids.append('%s=(%s)' % (idT.get('field', 'UT'), idT.get('third_id'))) + todo_query = ' OR '.join(third_ids) + return todo_query + + def get_batch_ids(self) -> List[Dict[str, str]]: + return self.id_list + + async def start(self): + if not os.path.exists(self.file_storage_dir): + os.makedirs(self.file_storage_dir) + + qu = self.make_query() + yield JsonRequest( + config.WOS_ADVANCED_SEARCH_API, method='POST', data=model.make_advanced_search_ut(query=qu), + ) + + def parse(self, response, **kwargs): + meta = response.meta + request = response.request + query_id, records_found = model.get_record_info(response.body) + if (not query_id) or (records_found == 0): + self.logger.warning(""" + 未找到记录!!! + 错误信息 %s + 请求信息 %s""" % (response.text, request)) + return + else: + self.set_records_found(records_found) + mark_start = 1 + yield JsonRequest(config.WOS_EXPORT_FILE_API, method='POST', + data=model.export_search_data_to_txt(query_id, mark_from=mark_start, + mark_to=records_found), + meta={'QUERY_ID': query_id, 'QUERY': meta.get('QUERY'), + 'FILENAME': meta.get("FILENAME"), + 'RECORDS_FOUND': records_found, 'MARK_START': mark_start, + 'MARK_END': records_found}, + cb_kwargs=dict(filename=meta.get("FILENAME"), query_id=query_id), + callback=self.download_parse) + + def download_parse(self, response, query_id: str = None, **kwargs): + filename = query_id or response.meta.get('FILENAME') + file_export_path = os.path.join(self.file_storage_dir, '%s.txt' % filename) + with open(file_export_path, 'wb') as f: + f.write(response.body) + yield from _parse_download(response.body) + + def set_records_found(self, val): + self._records_found = val + + def get_records_found(self) -> int: + return self._records_found + + +if __name__ == '__main__': + from scrapy.crawler import CrawlerProcess, Crawler + from scrapy.utils.project import get_project_settings + + process = CrawlerProcess(get_project_settings()) + process.crawl(WosDownloadSpider, task_obj=[]) + process.start()