add:wos文件解析

main
zhaoxiangpeng 1 month ago
parent 6d4b0a7dd9
commit 669c7836b6

@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
# @Time : 2024/3/5 16:05
# @Author : zhaoxiangpeng
# @File : parse_data.py
import logging
from typing import Union
from science_article_add.utils.tools import str2int
logger = logging.getLogger(__name__)
DEFAULT_TABLE_HEAD = ['PT', 'AU', 'BA', 'BE', 'GP', 'AF', 'BF', 'CA', 'TI', 'SO', 'SE', 'BS', 'LA', 'DT', 'CT', 'CY', 'CL', 'SP', 'HO', 'DE', 'ID', 'AB', 'C1', 'C3', 'RP', 'EM', 'RI', 'OI', 'FU', 'FP', 'FX', 'CR', 'NR', 'TC', 'Z9', 'U1', 'U2', 'PU', 'PI', 'PA', 'SN', 'EI', 'BN', 'J9', 'JI', 'PD', 'PY', 'VL', 'IS', 'PN', 'SU', 'SI', 'MA', 'BP', 'EP', 'AR', 'DI', 'DL', 'D2', 'EA', 'PG', 'WC', 'WE', 'SC', 'GA', 'PM', 'OA', 'HC', 'HP', 'DA', 'UT']
DEFAULT_TABLE_HEAD_LOWER = [h.lower() for h in DEFAULT_TABLE_HEAD]
def to_dict(data, headers: list):
data_text = data.strip().decode()
_to_dict = {}
for key, value in zip(headers, data_text.split('\t')):
if not value:
value = None
_to_dict[key] = value
vyear = None
str2int(_to_dict.get("py"), None)
try:
vyear = str2int(_to_dict.get("py"), None)
if not vyear:
logger.warning("WOS号: %s,年份异常: %s" % (_to_dict["ut"], _to_dict.get("py")))
except Exception as e:
logger.exception("""
原始数据: %s,
数据字典: %s
异常信息: %s""" % (data, _to_dict, e))
_to_dict["py"] = vyear
return _to_dict
def parse_full_records_txt(content: bytes):
lines = content.strip().split(b'\r\n')
head_line = lines.pop(0)
try:
head_start = head_line.index(b'PT')
head_line = head_line[head_start:]
head_line = head_line.strip().decode('utf-8')
HEADERS = head_line.split('\t')
HEADERS = [s.lower() for s in HEADERS]
except ValueError:
logger.error("内容出现异常跳过: %s" % head_line)
HEADERS = ['PT', 'AU', 'Z2', 'AF', 'BA', 'BF', 'CA', 'GP', 'BE', 'TI', 'Z1', 'SO', 'Z3', 'SE', 'BS', 'LA', 'DT', 'CT', 'CY', 'CL', 'SP', 'HO', 'DE', 'Z5', 'ID', 'AB', 'Z4', 'C1', 'Z6', 'RP', 'EM', 'Z7', 'RI', 'OI', 'FU', 'FX', 'CR', 'NR', 'TC', 'Z9', 'Z8', 'Z9', 'U1', 'U2', 'PU', 'PI', 'PA', 'SN', 'EI', 'BN', 'J9', 'JI', 'PD', 'PY', 'VL', 'IS', 'SI', 'PN', 'SU', 'MA', 'BP', 'EP', 'AR', 'DI', 'D2', 'EA', 'EY', 'PG', 'P2', 'WC', 'SC', 'PM', 'UT', 'OA', 'HP', 'HC', 'DA', 'C3']
HEADERS = [s.lower() for s in HEADERS]
while lines:
line_data = lines.pop(0)
# print(line_data)
standard_data = to_dict(line_data, HEADERS)
# third_id = standard_data.pop('ut', None)
# if not third_id:
# continue
yield standard_data
def parse_full_records(body: Union[bytes, str]):
"""
解析响应的下载内容
"""
if isinstance(body, str):
body = body.encode()
item_g = parse_full_records_txt(body)
for data_dic in item_g:
yield data_dic
Loading…
Cancel
Save