# -*- coding: utf-8 -*- # @Time : 2022/6/8 16:54 # @Author : ZhaoXiangPeng # @File : inspec2mongo.py from ReSpider.db.mongodb import AsyncMongoDB from ReSpider.extend.logger import LogMixin from doi_parse.inspec2redis import InspecToRedis import asyncio import os import json import time import logging DOI_LIST = {'10.19381/j.issn.1001-7585.2020.21.008', '10.14111/j.cnki.zgfx.2019.06.004', '10.13530/j.cnki.jlis.190047'} class AsyncBase(LogMixin): def __init__(self, task_list: list, limit: int = 6, loop=None): super().__init__() self.TASK_LIST: list = task_list self.loop = loop or asyncio.get_event_loop() self.limit = limit def add_callback(self, func): """ 添加任务处理方法 """ self.call_func = func async def next_task(self): semaphore = asyncio.Semaphore(value=self.limit) while True: try: task = self.TASK_LIST.pop() await semaphore.acquire() self.loop.create_task( self.call_func(task, semaphore)) except IndexError: await asyncio.sleep(3) if len(asyncio.all_tasks(loop=self.loop)) <= 1: break def execute(self): self.logger.info('TASK INIT SUCCESS.') try: self.loop.run_until_complete( self.next_task() ) except Exception as e: self.logger.error('execute %s' % e, exc_info=True) finally: self.loop.run_until_complete( self.loop.shutdown_asyncgens()) self.loop.stop() self.logger.info('THE END.') class InspecToMongo(InspecToRedis): def __init__(self, db=None, file_list=None, root: str = None): super().__init__(file_list=file_list) self.db: AsyncMongoDB = db self.file_list: list = file_list print('初始化任务 %s 个' % len(file_list)) self.root = root or 'H:/crossref_public_data_file_2021_01/' async def process(self, file=None, semaphore=None): if not file and self.file_list.__len__(): file = self.get_next() if not file: return print('******************** 当前处理文件 %s ********************' % file) io = self.compatible(file) await self.data_parse(io, semaphore) async def data_parse(self, io, semaphore=None): data = json.load(io) io.close() items = data['items'] for item in items: doi = item.get('DOI') doi_tag = False # dtype = item.get('type') # issn = item.get('ISSN') # title = item.get('title') and item.get('title')[0] # source = item.get('container-title') and item.get('container-title')[0] # publisher = item.get('publisher') ref_count = item.get('reference-count', 0) """ print('****************************************************\n' 'TITLE: %s\n' 'DOI: %s\n' 'TYPE: %s\n' 'ISSN: %s\n' 'SOURCE: %s\n' % (title, doi, dtype, issn, source))""" """ # mongodb存一份 ser_item = {'TITLE': title, 'DOI': doi, 'TYPE': dtype, 'PUBLISHER': publisher, 'REFERENCE-COUNT': ref_count, 'SOURCE': source, 'ISSN': issn} """ if doi in DOI_LIST: doi_tag = True # 如果doi为所需的doi,则所有参考文献都加入引用表, 且不太可能引用自己 if not ref_count: continue try: reference_list = [] for reference in item.get('reference', []): ref_doi = reference.get('DOI') if not ref_doi: # 没有doi直接跳过 continue # 有doi的逻辑 if doi_tag: doi_in = {'doi': doi, 'ref_doi': ref_doi} print(doi_in) reference_list.append(doi_in) continue elif ref_doi in DOI_LIST: ref_doi_in = {'doi': doi, 'ref_doi': ref_doi} print(ref_doi_in) reference_list.append(ref_doi_in) continue # ref_at = reference.get('article-title') # if ref_at: # # print(ref_at) # # reference_list.append(ref_at) # continue # ref_jt = reference.get('journal-title') except KeyError: print(item.keys()) else: if not reference_list: continue # print(reference_list[0]) await self.db.add_batch('data_crossref_doirelation', reference_list) semaphore.release() # 释放锁 def start(self): aio_task = AsyncBase(self.file_list, limit=6) aio_task.add_callback(self.process) aio_task.execute() if __name__ == '__main__': mdb = AsyncMongoDB(host='127.0.0.1', port=27017, db='data_crossref') files = InspecToRedis.load_gz('H:/crossref_public_data_file_2021_01') files = files[20000:] # files = ['0.json'] s = time.time() i2m = InspecToMongo( db=mdb, file_list=files ) i2m.start() # i2r.inspec2redis() print('耗时 %s 秒' % (time.time() - s))