You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

225 lines
7.8 KiB
Python

# -*- coding: utf-8 -*-
# @Time : 2024/2/1 17:10
# @Author : zhaoxiangpeng
# @File : 合并小文件.py
import os
import re
import warnings
import chardet
import pandas as pd
from loguru import logger
def read_standard(filename):
table = pd.read_table(filename, encoding_errors='ignore', on_bad_lines='skip', low_memory=False)
return table
def merge_files(path):
big_table = pd.DataFrame()
files = os.listdir(path)
for file in files:
file_path = os.path.join(path, file)
table = read_standard(file_path)
big_table = pd.concat([big_table, table])
big_table.to_csv(os.path.join("Y:\\zhaoxiangpeng\\2024BCR", '2024BCR总表.csv'), sep='\t', index=False)
def read_file(file_path, encoding: str = 'gbk', error: bool = False):
if not error:
f = open(file_path, encoding=encoding)
else:
warnings.warn('%s 编码异常,启用检查' % file_path)
check = open(file_path, 'rb')
data = check.read()
info = chardet.detect(data)
encoding = info['encoding']
kwargs = {}
kwargs.update(encoding=encoding)
warnings.warn('%s 尝试使用 "%s" 解码' % (file_path, encoding))
f = open(file_path, **kwargs)
code = encoding
return f, encoding
def merge_files_by_row(path):
"""
通过行读取的方式把小文件处理为标准的单个100,000条的文件
"""
ERROR_FILE = ['ALL-CM.CSV', 'ALL-HDX.CSV', '失败记录第二次重采-20231228 15时53分下载.csv']
data_column_count = 35
files = os.listdir(path)
decode_table = dict()
split_str = '\t'
documents = []
document_count = 0
file_seq = 1
for file in files:
if file in ERROR_FILE:
split_str = ','
logger.warning("文件可能被修改过, 跳过 %s" % file)
continue
else:
split_str = '\t'
file_path = os.path.join(path, file)
logger.info('处理 %s' % file_path)
f, code = read_file(file_path)
try:
h = f.readline()
head = h.strip('\n').split(split_str)
logger.debug("表头长度: %s, %s" % (len(head), head))
except UnicodeDecodeError:
f, code = read_file(file_path, error=True)
h = f.readline()
head = h.strip('\n').split(split_str)
logger.debug("表头长度: %s, %s" % (len(head), head))
if '' in head:
data_column_count = head.index('')
if len(head) > data_column_count:
head = head[:data_column_count]
# print(head)
while True:
# line = None
try:
line = f.readline()
except UnicodeDecodeError:
logger.info('错误行: %s' % line)
continue
if not line:
break
data = line.strip('\n').split(split_str)
documents.append(
dict(zip(head, data[:data_column_count]))
)
document_count += 1
if document_count >= 1e5:
shard = os.path.join("Y:\\zhaoxiangpeng\\2024BCR\\After", '%s.csv' % file_seq)
logger.info("数据条数到达 %s 保存一个文件: %s" % (document_count, shard))
big_table = pd.DataFrame(documents)
logger.info("配置 : %s %s" % big_table.shape)
big_table.to_csv(shard, sep='\t', index=False)
file_seq += 1
documents = []
document_count = 0
f.close()
shard = os.path.join("Y:\\zhaoxiangpeng\\2024BCR\\After", '%s.csv' % file_seq)
logger.info("数据条数到达 %s 保存最后一片: %s" % (1e5, shard))
big_table = pd.DataFrame(documents)
big_table.to_csv(shard, sep='\t', index=False)
logger.info("文件编码表: %s" % decode_table)
def merge_error_file(path, files: list):
"""合并小的文件"""
big_table = pd.DataFrame()
for file in files:
file_full_path = os.path.join(path, file)
small_table = pd.read_csv(file_full_path, low_memory=False, encoding_errors='ignore', on_bad_lines='skip')
print(small_table.shape)
big_table = pd.concat([big_table, small_table])
start = 0
split = 100000
row, col = big_table.shape
file_idx = 101
for x in range(start, row, split):
table = big_table[x: x + split]
table.to_csv(os.path.join('Y:\\zhaoxiangpeng\\2024BCR\\After', '%s.csv' % file_idx), index=False, sep='\t')
file_idx += 1
def merge_standard_file(path):
files = os.listdir(path)
big_table = pd.DataFrame()
for file in files:
file_full_path = os.path.join(path, file)
small_table = pd.read_csv(file_full_path, sep='\t', low_memory=False)
big_table = pd.concat([big_table, small_table])
row, col = big_table.shape
split = 1000000
file_idx = 1
for x in range(0, row, split):
table = big_table[x: x + split]
table.to_csv(os.path.join("Y:\\zhaoxiangpeng\\2024BCR\\MergeFile", '%s.csv' % file_idx), index=False, sep='\t')
file_idx += 1
def merge_small_file(path):
files = os.listdir(path)
big_table = pd.DataFrame()
for file in files:
file_full_path = os.path.join(path, file)
small_table = pd.read_csv(file_full_path, index_col=False, low_memory=False, encoding_errors='ignore', on_bad_lines='skip')
big_table = pd.concat([big_table, small_table])
row, col = big_table.shape
split = 800000
file_idx = 1
for x in range(0, row, split):
table = big_table[x: x + split]
table.to_excel(os.path.join("Y:\BCR\BCR202412\BCR2024书目补采API", '%s.xlsx' % file_idx), index=False)
file_idx += 1
def find_eid_by_regex(text):
res = re.search(r'2-s2\.0-\d+', text)
if res:
return res.group(0)
return None
def batch_match(path):
count = 0
line_count = 0
eid_collect = []
writer = open('Y:\\zhaoxiangpeng\\BCR\\2025BCR\\eid.csv', 'a+', encoding='utf-8')
writer.write('EID'+'\n')
file_list = os.listdir(path)
for fname in file_list:
file = os.path.join(path, fname)
with open(file, encoding='utf-8') as f:
while line := f.readline():
line_count += 1
eid = find_eid_by_regex(line)
if not eid:
print(line)
else:
count += 1
writer.write(eid + '\n')
writer.close()
print('总行数:%s\n匹配到:%s' % (line_count, count))
def func11():
path = 'Y:\\zhaoxiangpeng\\BCR\\2025BCR'
path2 = os.path.join(path, 'MergeFile')
files = os.listdir(path2)
big_table = pd.DataFrame()
for file in files:
file_full_path = os.path.join(path2, file)
small_table = pd.read_excel(file_full_path, engine='openpyxl', sheet_name=0)
small_table = small_table[['EID']]
print(small_table.shape)
big_table = pd.concat([big_table, small_table])
big_table.drop_duplicates(subset=['EID'], inplace=True)
t2 = pd.read_csv(os.path.join(path, 'eid.csv'))
t2.drop_duplicates(subset=['EID'], inplace=True)
t2.rename(columns={'EID': "EID2"}, inplace=True)
t0 = pd.merge(t2, big_table, how='left', left_on=['EID2'], right_on=['EID'])
print(t0)
t0[t0['EID'].isna()]['EID2'].to_csv(os.path.join(path, 'eid2.csv'), index=False)
if __name__ == '__main__':
# merge_files_by_row("Y:\\zhaoxiangpeng\\2024BCR\\API采集数据")
# merge_error_file("Y:\\zhaoxiangpeng\\2024BCR\\API采集数据",
# files=['ALL-CM.CSV', 'ALL-HDX.CSV', '失败记录第二次重采-20231228 15时53分下载.csv'])
# merge_standard_file('Y:\\zhaoxiangpeng\\2024BCR\\After')
merge_small_file(r'Y:\BCR\BCR202412\BCR2024书目补采API')
# batch_match('Y:\\zhaoxiangpeng\\BCR\\2025BCR\\API采集')
# func11()