|
|
# -*- coding: utf-8 -*-
|
|
|
# @Time : 2022/6/8 16:54
|
|
|
# @Author : ZhaoXiangPeng
|
|
|
# @File : inspec2mongo.py
|
|
|
|
|
|
from ReSpider.db.mongodb import AsyncMongoDB
|
|
|
from ReSpider.extend.logger import LogMixin
|
|
|
from doi_parse.inspec2redis import InspecToRedis
|
|
|
import asyncio
|
|
|
import os
|
|
|
import json
|
|
|
import time
|
|
|
import logging
|
|
|
|
|
|
|
|
|
DOI_LIST = {'10.19381/j.issn.1001-7585.2020.21.008', '10.14111/j.cnki.zgfx.2019.06.004', '10.13530/j.cnki.jlis.190047'}
|
|
|
|
|
|
|
|
|
class AsyncBase(LogMixin):
|
|
|
def __init__(self, task_list: list, limit: int = 6, loop=None):
|
|
|
super().__init__()
|
|
|
self.TASK_LIST: list = task_list
|
|
|
self.loop = loop or asyncio.get_event_loop()
|
|
|
self.limit = limit
|
|
|
|
|
|
def add_callback(self, func):
|
|
|
"""
|
|
|
添加任务处理方法
|
|
|
"""
|
|
|
self.call_func = func
|
|
|
|
|
|
async def next_task(self):
|
|
|
semaphore = asyncio.Semaphore(value=self.limit)
|
|
|
while True:
|
|
|
try:
|
|
|
task = self.TASK_LIST.pop()
|
|
|
await semaphore.acquire()
|
|
|
self.loop.create_task(
|
|
|
self.call_func(task, semaphore))
|
|
|
except IndexError:
|
|
|
await asyncio.sleep(3)
|
|
|
if len(asyncio.all_tasks(loop=self.loop)) <= 1:
|
|
|
break
|
|
|
|
|
|
def execute(self):
|
|
|
self.logger.info('TASK INIT SUCCESS.')
|
|
|
try:
|
|
|
self.loop.run_until_complete(
|
|
|
self.next_task()
|
|
|
)
|
|
|
except Exception as e:
|
|
|
self.logger.error('execute %s' % e, exc_info=True)
|
|
|
finally:
|
|
|
self.loop.run_until_complete(
|
|
|
self.loop.shutdown_asyncgens())
|
|
|
self.loop.stop()
|
|
|
self.logger.info('THE END.')
|
|
|
|
|
|
|
|
|
class InspecToMongo(InspecToRedis):
|
|
|
def __init__(self, db=None, file_list=None, root: str = None):
|
|
|
super().__init__(file_list=file_list)
|
|
|
self.db: AsyncMongoDB = db
|
|
|
self.file_list: list = file_list
|
|
|
print('初始化任务 %s 个' % len(file_list))
|
|
|
self.root = root or 'H:/crossref_public_data_file_2021_01/'
|
|
|
|
|
|
async def process(self, file=None, semaphore=None):
|
|
|
if not file and self.file_list.__len__():
|
|
|
file = self.get_next()
|
|
|
if not file:
|
|
|
return
|
|
|
print('******************** 当前处理文件 %s ********************' % file)
|
|
|
|
|
|
io = self.compatible(file)
|
|
|
await self.data_parse(io, semaphore)
|
|
|
|
|
|
async def data_parse(self, io, semaphore=None):
|
|
|
data = json.load(io)
|
|
|
io.close()
|
|
|
items = data['items']
|
|
|
for item in items:
|
|
|
doi = item.get('DOI')
|
|
|
doi_tag = False
|
|
|
# dtype = item.get('type')
|
|
|
# issn = item.get('ISSN')
|
|
|
# title = item.get('title') and item.get('title')[0]
|
|
|
# source = item.get('container-title') and item.get('container-title')[0]
|
|
|
# publisher = item.get('publisher')
|
|
|
ref_count = item.get('reference-count', 0)
|
|
|
"""
|
|
|
print('****************************************************\n'
|
|
|
'TITLE: %s\n'
|
|
|
'DOI: %s\n'
|
|
|
'TYPE: %s\n'
|
|
|
'ISSN: %s\n'
|
|
|
'SOURCE: %s\n' % (title, doi, dtype, issn, source))"""
|
|
|
"""
|
|
|
# mongodb存一份
|
|
|
ser_item = {'TITLE': title, 'DOI': doi, 'TYPE': dtype, 'PUBLISHER': publisher,
|
|
|
'REFERENCE-COUNT': ref_count,
|
|
|
'SOURCE': source, 'ISSN': issn}
|
|
|
"""
|
|
|
if doi in DOI_LIST:
|
|
|
doi_tag = True # 如果doi为所需的doi,则所有参考文献都加入引用表, 且不太可能引用自己
|
|
|
if not ref_count:
|
|
|
continue
|
|
|
try:
|
|
|
reference_list = []
|
|
|
for reference in item.get('reference', []):
|
|
|
ref_doi = reference.get('DOI')
|
|
|
if not ref_doi:
|
|
|
# 没有doi直接跳过
|
|
|
continue
|
|
|
# 有doi的逻辑
|
|
|
if doi_tag:
|
|
|
doi_in = {'doi': doi, 'ref_doi': ref_doi}
|
|
|
print(doi_in)
|
|
|
reference_list.append(doi_in)
|
|
|
continue
|
|
|
elif ref_doi in DOI_LIST:
|
|
|
ref_doi_in = {'doi': doi, 'ref_doi': ref_doi}
|
|
|
print(ref_doi_in)
|
|
|
reference_list.append(ref_doi_in)
|
|
|
continue
|
|
|
# ref_at = reference.get('article-title')
|
|
|
# if ref_at:
|
|
|
# # print(ref_at)
|
|
|
# # reference_list.append(ref_at)
|
|
|
# continue
|
|
|
# ref_jt = reference.get('journal-title')
|
|
|
except KeyError:
|
|
|
print(item.keys())
|
|
|
else:
|
|
|
if not reference_list:
|
|
|
continue
|
|
|
# print(reference_list[0])
|
|
|
await self.db.add_batch('data_crossref_doirelation', reference_list)
|
|
|
semaphore.release() # 释放锁
|
|
|
|
|
|
def start(self):
|
|
|
aio_task = AsyncBase(self.file_list, limit=6)
|
|
|
aio_task.add_callback(self.process)
|
|
|
aio_task.execute()
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
mdb = AsyncMongoDB(host='127.0.0.1', port=27017, db='data_crossref')
|
|
|
files = InspecToRedis.load_gz('H:/crossref_public_data_file_2021_01')
|
|
|
files = files[20000:]
|
|
|
# files = ['0.json']
|
|
|
s = time.time()
|
|
|
i2m = InspecToMongo(
|
|
|
db=mdb,
|
|
|
file_list=files
|
|
|
)
|
|
|
i2m.start()
|
|
|
# i2r.inspec2redis()
|
|
|
print('耗时 %s 秒' % (time.time() - s))
|