You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

175 lines
5.4 KiB
Python

# -*- coding: utf-8 -*-
# @Time : 2022/5/31 10:02
# @Author : ZhaoXiangPeng
# @File : inspec2redis.py
from ReSpider.db.redisdb import RedisDB
from doi_parse.gz import un_gz
from queue import Queue
import pandas as pd
import os
import time
import json
from concurrent.futures import ThreadPoolExecutor
class Counter:
def __init__(self, clint=None):
self.client: RedisDB = clint
if clint is None:
self.client = RedisDB(db=2)
def incr(self, key: list, counter: list):
"""
key: 可以作为刊唯一id的 redis key
counter: 可以作为文章唯一id的 doi 或 title
"""
pipe = self.client._redis.pipeline()
pipe.multi()
for k in key:
k = k.upper()
if not self.sismember('inspec:journals', k):
continue
for value in counter:
pipe.hincrby(k, value)
pipe.execute()
def decr(self, key: str, counter: str, amount=1):
"""计数累减"""
return self.client.hincrby(key, counter, amount=-amount)
def get_cnt(self, key: str, counter: str):
"""获取当前计数的值"""
return self.client.hget(key, counter)
def sismember(self, key, value):
return self.client.sismember(key, value)
class InspecToRedis:
"""
1. 加载文件
2. 解压
3. 解析
4. 入库
4.1 ISSN 关联
4.2 ISSN1
doi1 1
doi2 1
ISSN2
doi1 2
doi2 1
"""
def __init__(self, counter=None, file_list=None, root: str = None):
self.counter = counter # 初始化计数器
self.file_list: list = file_list
print('初始化任务 %s' % len(file_list))
# self.to_queue(file_list)
self.root = root or 'H:/crossref_public_data_file_2021_01/'
@staticmethod
def load_gz(file_path):
return os.listdir(file_path)
# def to_queue(self, file_list):
# for file in file_list:
# self.file_list.put_nowait(file)
# print('*'*50, '注入队列完成', '*'*50)
def inspec2redis(self):
df = pd.read_csv('F:/工作数据存储2022/20220526_inspec测试/inspec期刊列表2.csv')
issn_list = df['ISSN'].values.tolist()
return self.counter.client.sadd('inspec:journals', issn_list)
def get_next(self):
item = self.file_list.pop()
return item
def compatible(self, file):
if file[-2:] == 'gz':
io = un_gz(self.root+file)
else:
io = open(self.root+file, encoding='utf-8')
return io
def data_parse(self, io):
data = json.load(io)
items = data['items']
for item in items:
doi = item.get('DOI')
dtype = item.get('type')
issn = item.get('ISSN')
title = item.get('title') and item.get('title')[0]
source = item.get('container-title') and item.get('container-title')[0]
publisher = item.get('publisher')
ref_count = item.get('reference-count', 0)
"""
print('****************************************************\n'
'TITLE: %s\n'
'DOI: %s\n'
'TYPE: %s\n'
'ISSN: %s\n'
'SOURCE: %s\n' % (title, doi, dtype, issn, source))"""
"""
# mongodb存一份
ser_item = {'TITLE': title, 'DOI': doi, 'TYPE': dtype, 'PUBLISHER': publisher,
'REFERENCE-COUNT': ref_count,
'SOURCE': source, 'ISSN': issn}
"""
if not ref_count:
continue
try:
reference_list = []
for reference in item.get('reference', []):
ref_doi = reference.get('DOI')
if ref_doi:
# do something
# print(ref_doi)
reference_list.append(ref_doi)
continue
ref_at = reference.get('article-title')
if ref_at:
# print(ref_at)
# reference_list.append(ref_at)
continue
ref_jt = reference.get('journal-title')
except KeyError:
print(item.keys())
else:
self.counter.incr(issn, reference_list)
def pro(self, file=None):
# print('剩余任务 %s 个' % self.file_list.__len__())
if not file and self.file_list.__len__():
file = self.get_next()
if not file:
return
print('******************** 当前处理文件 %s ********************' % file)
io = self.compatible(file)
self.data_parse(io)
io.close()
def batch(self):
with ThreadPoolExecutor(max_workers=2) as executor:
executor.map(self.pro, self.file_list)
def start(self):
index_count = 0
while len(self.file_list):
index_count += 1
print('当前处理第 %s' % index_count)
self.pro()
if __name__ == '__main__':
count = Counter()
# files = InspecToRedis.load_gz('H:/crossref_public_data_file_2021_01')
# files = files[40000:]
files = ['0.json']
i2r = InspecToRedis(counter=count, file_list=files)
s = time.time()
i2r.batch()
# i2r.inspec2redis()
print('耗时 %s' % (time.time()-s))