You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

160 lines
5.6 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# @Time : 2022/6/8 16:54
# @Author : ZhaoXiangPeng
# @File : inspec2mongo.py
from ReSpider.db.mongodb import AsyncMongoDB
from ReSpider.extend.logger import LogMixin
from doi_parse.inspec2redis import InspecToRedis
import asyncio
import os
import json
import time
import logging
DOI_LIST = {'10.19381/j.issn.1001-7585.2020.21.008', '10.14111/j.cnki.zgfx.2019.06.004', '10.13530/j.cnki.jlis.190047'}
class AsyncBase(LogMixin):
def __init__(self, task_list: list, limit: int = 6, loop=None):
super().__init__()
self.TASK_LIST: list = task_list
self.loop = loop or asyncio.get_event_loop()
self.limit = limit
def add_callback(self, func):
"""
添加任务处理方法
"""
self.call_func = func
async def next_task(self):
semaphore = asyncio.Semaphore(value=self.limit)
while True:
try:
task = self.TASK_LIST.pop()
await semaphore.acquire()
self.loop.create_task(
self.call_func(task, semaphore))
except IndexError:
await asyncio.sleep(3)
if len(asyncio.all_tasks(loop=self.loop)) <= 1:
break
def execute(self):
self.logger.info('TASK INIT SUCCESS.')
try:
self.loop.run_until_complete(
self.next_task()
)
except Exception as e:
self.logger.error('execute %s' % e, exc_info=True)
finally:
self.loop.run_until_complete(
self.loop.shutdown_asyncgens())
self.loop.stop()
self.logger.info('THE END.')
class InspecToMongo(InspecToRedis):
def __init__(self, db=None, file_list=None, root: str = None):
super().__init__(file_list=file_list)
self.db: AsyncMongoDB = db
self.file_list: list = file_list
print('初始化任务 %s' % len(file_list))
self.root = root or 'H:/crossref_public_data_file_2021_01/'
async def process(self, file=None, semaphore=None):
if not file and self.file_list.__len__():
file = self.get_next()
if not file:
return
print('******************** 当前处理文件 %s ********************' % file)
io = self.compatible(file)
await self.data_parse(io, semaphore)
async def data_parse(self, io, semaphore=None):
data = json.load(io)
io.close()
items = data['items']
for item in items:
doi = item.get('DOI')
doi_tag = False
# dtype = item.get('type')
# issn = item.get('ISSN')
# title = item.get('title') and item.get('title')[0]
# source = item.get('container-title') and item.get('container-title')[0]
# publisher = item.get('publisher')
ref_count = item.get('reference-count', 0)
"""
print('****************************************************\n'
'TITLE: %s\n'
'DOI: %s\n'
'TYPE: %s\n'
'ISSN: %s\n'
'SOURCE: %s\n' % (title, doi, dtype, issn, source))"""
"""
# mongodb存一份
ser_item = {'TITLE': title, 'DOI': doi, 'TYPE': dtype, 'PUBLISHER': publisher,
'REFERENCE-COUNT': ref_count,
'SOURCE': source, 'ISSN': issn}
"""
if doi in DOI_LIST:
doi_tag = True # 如果doi为所需的doi则所有参考文献都加入引用表, 且不太可能引用自己
if not ref_count:
continue
try:
reference_list = []
for reference in item.get('reference', []):
ref_doi = reference.get('DOI')
if not ref_doi:
# 没有doi直接跳过
continue
# 有doi的逻辑
if doi_tag:
doi_in = {'doi': doi, 'ref_doi': ref_doi}
print(doi_in)
reference_list.append(doi_in)
continue
elif ref_doi in DOI_LIST:
ref_doi_in = {'doi': doi, 'ref_doi': ref_doi}
print(ref_doi_in)
reference_list.append(ref_doi_in)
continue
# ref_at = reference.get('article-title')
# if ref_at:
# # print(ref_at)
# # reference_list.append(ref_at)
# continue
# ref_jt = reference.get('journal-title')
except KeyError:
print(item.keys())
else:
if not reference_list:
continue
# print(reference_list[0])
await self.db.add_batch('data_crossref_doirelation', reference_list)
semaphore.release() # 释放锁
def start(self):
aio_task = AsyncBase(self.file_list, limit=6)
aio_task.add_callback(self.process)
aio_task.execute()
if __name__ == '__main__':
mdb = AsyncMongoDB(host='127.0.0.1', port=27017, db='data_crossref')
files = InspecToRedis.load_gz('H:/crossref_public_data_file_2021_01')
files = files[20000:]
# files = ['0.json']
s = time.time()
i2m = InspecToMongo(
db=mdb,
file_list=files
)
i2m.start()
# i2r.inspec2redis()
print('耗时 %s' % (time.time() - s))