|
|
|
|
@ -0,0 +1,149 @@
|
|
|
|
|
import os
|
|
|
|
|
import json
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from typing import List, Dict, Union, Any, Self
|
|
|
|
|
import scrapy
|
|
|
|
|
from scrapy.http.request.json_request import JsonRequest
|
|
|
|
|
from scrapy.crawler import Crawler
|
|
|
|
|
|
|
|
|
|
from science_article_add.items.wos import WosArticleItem, WosCitedNumberItem
|
|
|
|
|
from science_article_add.scripts.wos_parse_data import parse_full_records_txt
|
|
|
|
|
from science_article_add.models import wos_model as model
|
|
|
|
|
from science_article_add.utils import tools
|
|
|
|
|
from science_article_add.configs import wos as config
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_download(body: Union[bytes, str]):
|
|
|
|
|
"""
|
|
|
|
|
解析响应的下载内容
|
|
|
|
|
"""
|
|
|
|
|
batch_time = datetime.now()
|
|
|
|
|
if isinstance(body, str):
|
|
|
|
|
body = body.encode()
|
|
|
|
|
item_g = parse_full_records_txt(body)
|
|
|
|
|
parse_count = 0
|
|
|
|
|
for data_dic in item_g:
|
|
|
|
|
t_id = data_dic.pop('ut', None)
|
|
|
|
|
if t_id:
|
|
|
|
|
parse_count += 1
|
|
|
|
|
article_item = WosArticleItem()
|
|
|
|
|
article_item['third_id'] = t_id
|
|
|
|
|
article_item['exported'] = data_dic
|
|
|
|
|
article_item['updated_at'] = batch_time
|
|
|
|
|
yield article_item
|
|
|
|
|
# 解析被引量
|
|
|
|
|
if cited_num := tools.str2int(data_dic.get("tc", 0), 0):
|
|
|
|
|
cited_item = WosCitedNumberItem()
|
|
|
|
|
cited_item['third_id'] = t_id
|
|
|
|
|
cited_item['cited'] = cited_num
|
|
|
|
|
cited_item['updated_at'] = batch_time
|
|
|
|
|
yield cited_item
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WosDownloadSpider(scrapy.Spider):
|
|
|
|
|
name = "wos_download"
|
|
|
|
|
custom_settings = dict(
|
|
|
|
|
FILE_STORAGE_DIR=r"Y:\wos-metadata\wos increment-202512\03",
|
|
|
|
|
DOWNLOADER_MIDDLEWARES={
|
|
|
|
|
"science_article_add.middlewares.wos.WosSidParamMiddleware": 500
|
|
|
|
|
},
|
|
|
|
|
ITEM_PIPELINES={
|
|
|
|
|
"science_article_add.pipelines.mongo.MongoPipeline": 300,
|
|
|
|
|
"science_article_add.pipelines.verify_data.VerifyDataIntegrity": 400,
|
|
|
|
|
},
|
|
|
|
|
LOG_LEVEL="INFO"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __init__(self, task_obj, file_storage_dir: str = None, **kwargs):
|
|
|
|
|
scrapy.Spider.__init__(self)
|
|
|
|
|
self.file_storage_dir = file_storage_dir
|
|
|
|
|
self.id_list: List[Dict[str, str]] = task_obj
|
|
|
|
|
self._records_found = 0
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def from_crawler(cls, crawler: Crawler, *args: Any, **kwargs: Any) -> Self:
|
|
|
|
|
settings = crawler.settings
|
|
|
|
|
from pymongo import MongoClient
|
|
|
|
|
client = MongoClient(settings.get("MONGO_URI"))
|
|
|
|
|
db = client.get_database(settings.get("MONGO_DATABASE"))
|
|
|
|
|
collection = db.get_collection("todo_ids_wos")
|
|
|
|
|
|
|
|
|
|
def f():
|
|
|
|
|
cursor = collection.find(filter={"state": 0}, projection={"state": 0}).limit(500)
|
|
|
|
|
d = [c for c in cursor]
|
|
|
|
|
if not d:
|
|
|
|
|
cursor = collection.find(filter={"state": 2}, projection={"_id": 0, "state": 0}).limit(500)
|
|
|
|
|
d = [c for c in cursor]
|
|
|
|
|
else:
|
|
|
|
|
_ids = [x.pop("_id", None) for x in d]
|
|
|
|
|
collection.update_many(filter={"_id": {"$in": _ids}}, update={"$set": {"state": 2}})
|
|
|
|
|
return d
|
|
|
|
|
|
|
|
|
|
tasks = f()
|
|
|
|
|
kwargs.update({"task_obj": tasks})
|
|
|
|
|
kwargs['file_storage_dir'] = settings.get("FILE_STORAGE_DIR")
|
|
|
|
|
return super().from_crawler(crawler, *args, **kwargs)
|
|
|
|
|
|
|
|
|
|
def make_query(self) -> str:
|
|
|
|
|
third_ids = []
|
|
|
|
|
for idT in self.id_list:
|
|
|
|
|
third_ids.append('%s=(%s)' % (idT.get('field', 'UT'), idT.get('third_id')))
|
|
|
|
|
todo_query = ' OR '.join(third_ids)
|
|
|
|
|
return todo_query
|
|
|
|
|
|
|
|
|
|
def get_batch_ids(self) -> List[Dict[str, str]]:
|
|
|
|
|
return self.id_list
|
|
|
|
|
|
|
|
|
|
async def start(self):
|
|
|
|
|
if not os.path.exists(self.file_storage_dir):
|
|
|
|
|
os.makedirs(self.file_storage_dir)
|
|
|
|
|
|
|
|
|
|
qu = self.make_query()
|
|
|
|
|
yield JsonRequest(
|
|
|
|
|
config.WOS_ADVANCED_SEARCH_API, method='POST', data=model.make_advanced_search_ut(query=qu),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def parse(self, response, **kwargs):
|
|
|
|
|
meta = response.meta
|
|
|
|
|
request = response.request
|
|
|
|
|
query_id, records_found = model.get_record_info(response.body)
|
|
|
|
|
if (not query_id) or (records_found == 0):
|
|
|
|
|
self.logger.warning("""
|
|
|
|
|
未找到记录!!!
|
|
|
|
|
错误信息 %s
|
|
|
|
|
请求信息 %s""" % (response.text, request))
|
|
|
|
|
return
|
|
|
|
|
else:
|
|
|
|
|
self.set_records_found(records_found)
|
|
|
|
|
mark_start = 1
|
|
|
|
|
yield JsonRequest(config.WOS_EXPORT_FILE_API, method='POST',
|
|
|
|
|
data=model.export_search_data_to_txt(query_id, mark_from=mark_start,
|
|
|
|
|
mark_to=records_found),
|
|
|
|
|
meta={'QUERY_ID': query_id, 'QUERY': meta.get('QUERY'),
|
|
|
|
|
'FILENAME': meta.get("FILENAME"),
|
|
|
|
|
'RECORDS_FOUND': records_found, 'MARK_START': mark_start,
|
|
|
|
|
'MARK_END': records_found},
|
|
|
|
|
cb_kwargs=dict(filename=meta.get("FILENAME"), query_id=query_id),
|
|
|
|
|
callback=self.download_parse)
|
|
|
|
|
|
|
|
|
|
def download_parse(self, response, query_id: str = None, **kwargs):
|
|
|
|
|
filename = query_id or response.meta.get('FILENAME')
|
|
|
|
|
file_export_path = os.path.join(self.file_storage_dir, '%s.txt' % filename)
|
|
|
|
|
with open(file_export_path, 'wb') as f:
|
|
|
|
|
f.write(response.body)
|
|
|
|
|
yield from _parse_download(response.body)
|
|
|
|
|
|
|
|
|
|
def set_records_found(self, val):
|
|
|
|
|
self._records_found = val
|
|
|
|
|
|
|
|
|
|
def get_records_found(self) -> int:
|
|
|
|
|
return self._records_found
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
from scrapy.crawler import CrawlerProcess, Crawler
|
|
|
|
|
from scrapy.utils.project import get_project_settings
|
|
|
|
|
|
|
|
|
|
process = CrawlerProcess(get_project_settings())
|
|
|
|
|
process.crawl(WosDownloadSpider, task_obj=[])
|
|
|
|
|
process.start()
|