145 lines
4.8 KiB
Python
145 lines
4.8 KiB
Python
import hashlib
|
||
import os
|
||
from typing import List, Tuple
|
||
|
||
def get_file_hash(file_path: str) -> str:
|
||
"""Calculate MD5 hash of a file"""
|
||
hash_md5 = hashlib.md5()
|
||
with open(file_path, "rb") as f:
|
||
for chunk in iter(lambda: f.read(4096), b""):
|
||
hash_md5.update(chunk)
|
||
return hash_md5.hexdigest()
|
||
|
||
def detect_encoding(file_path: str) -> str:
|
||
"""Detect file encoding"""
|
||
# Try to import chardet, if not available use fallback
|
||
chardet = None
|
||
try:
|
||
import chardet as chardet_module
|
||
chardet = chardet_module
|
||
except ImportError:
|
||
pass
|
||
|
||
if chardet:
|
||
with open(file_path, "rb") as f:
|
||
raw_data = f.read()
|
||
result = chardet.detect(raw_data)
|
||
encoding = result["encoding"]
|
||
|
||
# Priority: UTF-8 > GBK > others
|
||
if encoding and encoding.lower().startswith("utf-8"):
|
||
return "utf-8"
|
||
elif encoding and encoding.lower() == "gbk":
|
||
return "gbk"
|
||
else:
|
||
return encoding
|
||
else:
|
||
# Fallback if chardet is not available
|
||
return "utf-8"
|
||
|
||
def read_text_file(file_path: str) -> str:
|
||
"""Read text file with automatic encoding detection"""
|
||
try:
|
||
encoding = detect_encoding(file_path)
|
||
if not encoding:
|
||
raise ValueError("无法识别文件编码")
|
||
|
||
with open(file_path, "r", encoding=encoding) as f:
|
||
return f.read()
|
||
except UnicodeDecodeError:
|
||
# Fallback to utf-8 with error handling
|
||
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
|
||
return f.read()
|
||
except Exception as e:
|
||
raise ValueError(f"文件读取失败: {str(e)}")
|
||
|
||
def split_into_paragraphs(text: str) -> List[str]:
|
||
"""Split text into paragraphs with improved logic"""
|
||
import re
|
||
|
||
# 首先尝试按标题分割
|
||
# 查找标题模式(以数字或汉字开头,后跟顿号、点或空格)
|
||
title_pattern = r'(?:^|\n)([一二三四五六七八九十\d]+[\.、 ]+.*?)(?=\n[一二三四五六七八九十\d]+[\.、 ]+|$)'
|
||
title_matches = re.findall(title_pattern, text)
|
||
|
||
if title_matches:
|
||
# 按标题分割
|
||
sections = re.split(title_pattern, text)
|
||
# 移除空字符串并重新组织
|
||
sections = [s.strip() for s in sections if s.strip()]
|
||
return sections
|
||
|
||
# 然后尝试按双换行符分割(段落)
|
||
paragraphs = text.split("\n\n")
|
||
|
||
# 如果段落太少,尝试按句子分割
|
||
if len(paragraphs) <= 2:
|
||
# 使用更复杂的句子分割逻辑
|
||
sentence_endings = r'[。!?.!?;;]'
|
||
sentences = re.split(sentence_endings, text)
|
||
sentences = [s.strip() for s in sentences if s.strip()]
|
||
|
||
# 组合短句子
|
||
combined_sentences = []
|
||
current_sentence = ""
|
||
|
||
for sentence in sentences:
|
||
if not sentence:
|
||
continue
|
||
|
||
# 添加标点符号(除了最后一个句子)
|
||
if sentence != sentences[-1]:
|
||
sentence += text[text.find(sentence) + len(sentence)] if text.find(sentence) + len(sentence) < len(text) else "。"
|
||
|
||
current_sentence += sentence
|
||
|
||
# 如果当前句子足够长或者遇到明显的句子结束符
|
||
if len(current_sentence) > 50 or re.search(r'[。!?.!?]$', sentence):
|
||
combined_sentences.append(current_sentence)
|
||
current_sentence = ""
|
||
|
||
# 添加剩余的句子
|
||
if current_sentence:
|
||
combined_sentences.append(current_sentence)
|
||
|
||
return combined_sentences
|
||
|
||
# 过滤空段落
|
||
return [p.strip() for p in paragraphs if p.strip()]
|
||
|
||
def clean_text(text: str) -> str:
|
||
"""Clean text by removing noise"""
|
||
lines = text.split("\n")
|
||
cleaned_lines = []
|
||
|
||
# Remove header/footer patterns
|
||
skip_patterns = [r"第\d+页", r"^\s*$"]
|
||
|
||
import re
|
||
for line in lines:
|
||
# Skip lines with header/footer patterns
|
||
if any(re.search(pattern, line) for pattern in skip_patterns):
|
||
continue
|
||
|
||
# Skip lines with乱码 (garbled text)
|
||
if "锘" in line or "???" in line:
|
||
continue
|
||
|
||
cleaned_lines.append(line)
|
||
|
||
# Remove consecutive duplicate lines (3 or more)
|
||
final_lines = []
|
||
prev_line = ""
|
||
repeat_count = 0
|
||
|
||
for line in cleaned_lines:
|
||
if line == prev_line:
|
||
repeat_count += 1
|
||
if repeat_count < 3: # Keep up to 2 duplicates
|
||
final_lines.append(line)
|
||
else:
|
||
repeat_count = 0
|
||
final_lines.append(line)
|
||
prev_line = line
|
||
|
||
return "\n".join(final_lines) |