You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
175 lines
5.4 KiB
Python
175 lines
5.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
# @Time : 2022/5/31 10:02
|
|
# @Author : ZhaoXiangPeng
|
|
# @File : inspec2redis.py
|
|
|
|
from ReSpider.db.redisdb import RedisDB
|
|
from doi_parse.gz import un_gz
|
|
from queue import Queue
|
|
import pandas as pd
|
|
import os
|
|
import time
|
|
import json
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
|
class Counter:
|
|
def __init__(self, clint=None):
|
|
self.client: RedisDB = clint
|
|
if clint is None:
|
|
self.client = RedisDB(db=2)
|
|
|
|
def incr(self, key: list, counter: list):
|
|
"""
|
|
key: 可以作为刊唯一id的 redis key
|
|
counter: 可以作为文章唯一id的 doi 或 title
|
|
"""
|
|
pipe = self.client._redis.pipeline()
|
|
pipe.multi()
|
|
for k in key:
|
|
k = k.upper()
|
|
if not self.sismember('inspec:journals', k):
|
|
continue
|
|
for value in counter:
|
|
pipe.hincrby(k, value)
|
|
pipe.execute()
|
|
|
|
def decr(self, key: str, counter: str, amount=1):
|
|
"""计数累减"""
|
|
return self.client.hincrby(key, counter, amount=-amount)
|
|
|
|
def get_cnt(self, key: str, counter: str):
|
|
"""获取当前计数的值"""
|
|
return self.client.hget(key, counter)
|
|
|
|
def sismember(self, key, value):
|
|
return self.client.sismember(key, value)
|
|
|
|
|
|
class InspecToRedis:
|
|
"""
|
|
1. 加载文件
|
|
2. 解压
|
|
3. 解析
|
|
4. 入库
|
|
4.1 ISSN 关联
|
|
4.2 ISSN1
|
|
doi1 1
|
|
doi2 1
|
|
ISSN2
|
|
doi1 2
|
|
doi2 1
|
|
"""
|
|
def __init__(self, counter=None, file_list=None, root: str = None):
|
|
self.counter = counter # 初始化计数器
|
|
self.file_list: list = file_list
|
|
print('初始化任务 %s 个' % len(file_list))
|
|
# self.to_queue(file_list)
|
|
self.root = root or 'H:/crossref_public_data_file_2021_01/'
|
|
|
|
@staticmethod
|
|
def load_gz(file_path):
|
|
return os.listdir(file_path)
|
|
|
|
# def to_queue(self, file_list):
|
|
# for file in file_list:
|
|
# self.file_list.put_nowait(file)
|
|
# print('*'*50, '注入队列完成', '*'*50)
|
|
|
|
def inspec2redis(self):
|
|
df = pd.read_csv('F:/工作数据存储2022/20220526_inspec测试/inspec期刊列表2.csv')
|
|
issn_list = df['ISSN'].values.tolist()
|
|
return self.counter.client.sadd('inspec:journals', issn_list)
|
|
|
|
def get_next(self):
|
|
item = self.file_list.pop()
|
|
return item
|
|
|
|
def compatible(self, file):
|
|
if file[-2:] == 'gz':
|
|
io = un_gz(self.root+file)
|
|
else:
|
|
io = open(self.root+file, encoding='utf-8')
|
|
return io
|
|
|
|
def data_parse(self, io):
|
|
data = json.load(io)
|
|
items = data['items']
|
|
for item in items:
|
|
doi = item.get('DOI')
|
|
dtype = item.get('type')
|
|
issn = item.get('ISSN')
|
|
title = item.get('title') and item.get('title')[0]
|
|
source = item.get('container-title') and item.get('container-title')[0]
|
|
publisher = item.get('publisher')
|
|
ref_count = item.get('reference-count', 0)
|
|
"""
|
|
print('****************************************************\n'
|
|
'TITLE: %s\n'
|
|
'DOI: %s\n'
|
|
'TYPE: %s\n'
|
|
'ISSN: %s\n'
|
|
'SOURCE: %s\n' % (title, doi, dtype, issn, source))"""
|
|
"""
|
|
# mongodb存一份
|
|
ser_item = {'TITLE': title, 'DOI': doi, 'TYPE': dtype, 'PUBLISHER': publisher,
|
|
'REFERENCE-COUNT': ref_count,
|
|
'SOURCE': source, 'ISSN': issn}
|
|
"""
|
|
if not ref_count:
|
|
continue
|
|
try:
|
|
reference_list = []
|
|
for reference in item.get('reference', []):
|
|
ref_doi = reference.get('DOI')
|
|
if ref_doi:
|
|
# do something
|
|
# print(ref_doi)
|
|
reference_list.append(ref_doi)
|
|
continue
|
|
ref_at = reference.get('article-title')
|
|
if ref_at:
|
|
# print(ref_at)
|
|
# reference_list.append(ref_at)
|
|
continue
|
|
ref_jt = reference.get('journal-title')
|
|
except KeyError:
|
|
print(item.keys())
|
|
else:
|
|
self.counter.incr(issn, reference_list)
|
|
|
|
def pro(self, file=None):
|
|
# print('剩余任务 %s 个' % self.file_list.__len__())
|
|
if not file and self.file_list.__len__():
|
|
file = self.get_next()
|
|
if not file:
|
|
return
|
|
print('******************** 当前处理文件 %s ********************' % file)
|
|
|
|
io = self.compatible(file)
|
|
self.data_parse(io)
|
|
io.close()
|
|
|
|
def batch(self):
|
|
with ThreadPoolExecutor(max_workers=2) as executor:
|
|
executor.map(self.pro, self.file_list)
|
|
|
|
def start(self):
|
|
index_count = 0
|
|
while len(self.file_list):
|
|
index_count += 1
|
|
print('当前处理第 %s 个' % index_count)
|
|
self.pro()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
count = Counter()
|
|
# files = InspecToRedis.load_gz('H:/crossref_public_data_file_2021_01')
|
|
# files = files[40000:]
|
|
files = ['0.json']
|
|
i2r = InspecToRedis(counter=count, file_list=files)
|
|
s = time.time()
|
|
i2r.batch()
|
|
# i2r.inspec2redis()
|
|
print('耗时 %s 秒' % (time.time()-s))
|