nodebookls/utils.py

145 lines
4.8 KiB
Python
Raw Permalink Normal View History

2025-10-29 13:56:24 +08:00
import hashlib
import os
from typing import List, Tuple
def get_file_hash(file_path: str) -> str:
"""Calculate MD5 hash of a file"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def detect_encoding(file_path: str) -> str:
"""Detect file encoding"""
# Try to import chardet, if not available use fallback
chardet = None
try:
import chardet as chardet_module
chardet = chardet_module
except ImportError:
pass
if chardet:
with open(file_path, "rb") as f:
raw_data = f.read()
result = chardet.detect(raw_data)
encoding = result["encoding"]
# Priority: UTF-8 > GBK > others
if encoding and encoding.lower().startswith("utf-8"):
return "utf-8"
elif encoding and encoding.lower() == "gbk":
return "gbk"
else:
return encoding
else:
# Fallback if chardet is not available
return "utf-8"
def read_text_file(file_path: str) -> str:
"""Read text file with automatic encoding detection"""
try:
encoding = detect_encoding(file_path)
if not encoding:
raise ValueError("无法识别文件编码")
with open(file_path, "r", encoding=encoding) as f:
return f.read()
except UnicodeDecodeError:
# Fallback to utf-8 with error handling
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
except Exception as e:
raise ValueError(f"文件读取失败: {str(e)}")
def split_into_paragraphs(text: str) -> List[str]:
"""Split text into paragraphs with improved logic"""
import re
# 首先尝试按标题分割
# 查找标题模式(以数字或汉字开头,后跟顿号、点或空格)
title_pattern = r'(?:^|\n)([一二三四五六七八九十\d]+[\.、 ]+.*?)(?=\n[一二三四五六七八九十\d]+[\.、 ]+|$)'
title_matches = re.findall(title_pattern, text)
if title_matches:
# 按标题分割
sections = re.split(title_pattern, text)
# 移除空字符串并重新组织
sections = [s.strip() for s in sections if s.strip()]
return sections
# 然后尝试按双换行符分割(段落)
paragraphs = text.split("\n\n")
# 如果段落太少,尝试按句子分割
if len(paragraphs) <= 2:
# 使用更复杂的句子分割逻辑
sentence_endings = r'[。!?.!?;]'
sentences = re.split(sentence_endings, text)
sentences = [s.strip() for s in sentences if s.strip()]
# 组合短句子
combined_sentences = []
current_sentence = ""
for sentence in sentences:
if not sentence:
continue
# 添加标点符号(除了最后一个句子)
if sentence != sentences[-1]:
sentence += text[text.find(sentence) + len(sentence)] if text.find(sentence) + len(sentence) < len(text) else ""
current_sentence += sentence
# 如果当前句子足够长或者遇到明显的句子结束符
if len(current_sentence) > 50 or re.search(r'[。!?.!?]$', sentence):
combined_sentences.append(current_sentence)
current_sentence = ""
# 添加剩余的句子
if current_sentence:
combined_sentences.append(current_sentence)
return combined_sentences
# 过滤空段落
return [p.strip() for p in paragraphs if p.strip()]
def clean_text(text: str) -> str:
"""Clean text by removing noise"""
lines = text.split("\n")
cleaned_lines = []
# Remove header/footer patterns
skip_patterns = [r"\d+页", r"^\s*$"]
import re
for line in lines:
# Skip lines with header/footer patterns
if any(re.search(pattern, line) for pattern in skip_patterns):
continue
# Skip lines with乱码 (garbled text)
if "" in line or "???" in line:
continue
cleaned_lines.append(line)
# Remove consecutive duplicate lines (3 or more)
final_lines = []
prev_line = ""
repeat_count = 0
for line in cleaned_lines:
if line == prev_line:
repeat_count += 1
if repeat_count < 3: # Keep up to 2 duplicates
final_lines.append(line)
else:
repeat_count = 0
final_lines.append(line)
prev_line = line
return "\n".join(final_lines)