477 lines
19 KiB
Python
477 lines
19 KiB
Python
import re
|
||
import random
|
||
import argparse
|
||
import sys
|
||
import os
|
||
from typing import List, Tuple, Optional, Dict, Any
|
||
from pathlib import Path
|
||
import logging
|
||
|
||
|
||
class TextProcessor:
|
||
"""文本处理器类,支持句子拆分和字符交换"""
|
||
|
||
def __init__(self, min_length: int = 30, custom_punctuation: Optional[str] = None):
|
||
"""
|
||
初始化文本处理器
|
||
|
||
Args:
|
||
min_length: 句子长度阈值
|
||
custom_punctuation: 自定义标点符号,如果为None则使用默认标点
|
||
"""
|
||
self.min_length = min_length
|
||
self.sentence_endings = custom_punctuation or r'[,。!?;?!;]'
|
||
self.statistics = {
|
||
'total_sentences': 0,
|
||
'processed_sentences': 0,
|
||
'total_chars': 0,
|
||
'swapped_chars': 0
|
||
}
|
||
|
||
# 设置日志
|
||
logging.basicConfig(level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s')
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
def split_sentences(self, text: str) -> List[Tuple[str, str]]:
|
||
"""
|
||
按标点符号拆分句子,保留标点符号
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
List[Tuple[str, str]]: 每个元组包含 (句子内容, 标点符号)
|
||
"""
|
||
if not text.strip():
|
||
return []
|
||
|
||
# 使用正则表达式拆分,保留分隔符
|
||
parts = re.split(f'({self.sentence_endings})', text)
|
||
|
||
sentences = []
|
||
i = 0
|
||
while i < len(parts):
|
||
content = parts[i].strip()
|
||
if content: # 非空内容
|
||
# 检查下一个部分是否是标点符号
|
||
if i + 1 < len(parts) and re.match(self.sentence_endings, parts[i + 1]):
|
||
punctuation = parts[i + 1]
|
||
i += 2
|
||
else:
|
||
punctuation = ''
|
||
i += 1
|
||
sentences.append((content, punctuation))
|
||
self.statistics['total_sentences'] += 1
|
||
else:
|
||
i += 1
|
||
|
||
return sentences
|
||
|
||
def swap_random_chars(self, sentence: str) -> str:
|
||
"""
|
||
对超长句子随机交换相邻两个字符的顺序
|
||
|
||
Args:
|
||
sentence: 输入句子
|
||
|
||
Returns:
|
||
str: 处理后的句子
|
||
"""
|
||
# 边界情况处理
|
||
if not sentence or len(sentence) <= self.min_length or len(sentence) <= 3:
|
||
return sentence
|
||
|
||
# 转换为字符列表便于操作
|
||
chars = list(sentence)
|
||
original_length = len(chars)
|
||
|
||
# 确定可交换的范围(避开首尾字符,且需要成对相邻)
|
||
# 对于长度为n的句子,可交换的相邻对位置为:(1,2), (2,3), ..., (n-3,n-2)
|
||
start_idx = 1
|
||
end_idx = len(chars) - 3 # 最后一个可交换对的起始位置
|
||
|
||
if end_idx < start_idx:
|
||
return sentence
|
||
|
||
try:
|
||
# 随机选择一个相邻对的起始位置
|
||
swap_start = random.randint(start_idx, end_idx)
|
||
swap_end = swap_start + 1
|
||
|
||
# 交换相邻的两个字符
|
||
chars[swap_start], chars[swap_end] = chars[swap_end], chars[swap_start]
|
||
|
||
# 更新统计信息
|
||
self.statistics['processed_sentences'] += 1
|
||
self.statistics['swapped_chars'] += 2
|
||
|
||
self.logger.debug(f"交换相邻位置 {swap_start} 和 {swap_end},句子长度:{original_length}")
|
||
|
||
except (ValueError, IndexError) as e:
|
||
self.logger.warning(f"字符交换失败:{e}")
|
||
return sentence
|
||
|
||
return ''.join(chars)
|
||
|
||
def process_text(self, text: str) -> str:
|
||
"""
|
||
处理文本:拆分句子并对超长句子进行字符交换
|
||
|
||
Args:
|
||
text: 输入文本
|
||
|
||
Returns:
|
||
str: 处理后的文本
|
||
"""
|
||
if not text:
|
||
return text
|
||
|
||
# 重置统计信息
|
||
self.statistics = {
|
||
'total_sentences': 0,
|
||
'processed_sentences': 0,
|
||
'total_chars': len(text),
|
||
'swapped_chars': 0
|
||
}
|
||
|
||
# 按段落分割
|
||
paragraphs = text.split('\n')
|
||
processed_paragraphs = []
|
||
|
||
for paragraph in paragraphs:
|
||
if not paragraph.strip():
|
||
processed_paragraphs.append(paragraph)
|
||
continue
|
||
|
||
# 拆分句子
|
||
sentences = self.split_sentences(paragraph)
|
||
|
||
# 处理每个句子
|
||
processed_sentences = []
|
||
for sentence_content, punctuation in sentences:
|
||
# 对句子内容进行字符交换
|
||
processed_content = self.swap_random_chars(sentence_content)
|
||
processed_sentences.append(processed_content + punctuation)
|
||
|
||
# 重新组合句子
|
||
processed_paragraph = ''.join(processed_sentences)
|
||
processed_paragraphs.append(processed_paragraph)
|
||
|
||
return '\n'.join(processed_paragraphs)
|
||
|
||
def get_statistics(self) -> Dict[str, Any]:
|
||
"""获取处理统计信息"""
|
||
return self.statistics.copy()
|
||
|
||
def print_statistics(self):
|
||
"""打印处理统计信息"""
|
||
stats = self.get_statistics()
|
||
print("\n" + "=" * 50)
|
||
print("处理统计信息:")
|
||
print(f"总字符数:{stats['total_chars']}")
|
||
print(f"总句子数:{stats['total_sentences']}")
|
||
print(f"处理句子数:{stats['processed_sentences']}")
|
||
print(f"交换字符数:{stats['swapped_chars']}")
|
||
if stats['total_sentences'] > 0:
|
||
print(f"处理率:{stats['processed_sentences'] / stats['total_sentences'] * 100:.1f}%")
|
||
print("=" * 50)
|
||
|
||
|
||
class FileHandler:
|
||
"""文件处理器,负责文件的读写操作"""
|
||
|
||
@staticmethod
|
||
def read_file(filename: str) -> str:
|
||
"""
|
||
读取文件内容,支持多种编码
|
||
|
||
Args:
|
||
filename: 文件路径
|
||
|
||
Returns:
|
||
str: 文件内容
|
||
|
||
Raises:
|
||
FileNotFoundError: 文件不存在
|
||
PermissionError: 权限不足
|
||
UnicodeDecodeError: 编码错误
|
||
"""
|
||
if not os.path.exists(filename):
|
||
raise FileNotFoundError(f"文件 '{filename}' 不存在")
|
||
|
||
if not os.access(filename, os.R_OK):
|
||
raise PermissionError(f"没有读取文件 '{filename}' 的权限")
|
||
|
||
# 尝试多种编码格式
|
||
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
|
||
|
||
for encoding in encodings:
|
||
try:
|
||
with open(filename, 'r', encoding=encoding) as f:
|
||
content = f.read()
|
||
logging.info(f"使用 {encoding} 编码成功读取文件:{filename}")
|
||
return content
|
||
except UnicodeDecodeError:
|
||
continue
|
||
|
||
raise ValueError(f"无法解码文件 '{filename}',尝试的编码格式:{encodings}")
|
||
|
||
@staticmethod
|
||
def write_file(filename: str, content: str, encoding: str = 'utf-8') -> None:
|
||
"""
|
||
写入文件内容
|
||
|
||
Args:
|
||
filename: 输出文件路径
|
||
content: 要写入的内容
|
||
encoding: 编码格式
|
||
|
||
Raises:
|
||
PermissionError: 权限不足
|
||
OSError: 磁盘空间不足等系统错误
|
||
"""
|
||
# 确保目录存在
|
||
output_dir = os.path.dirname(filename)
|
||
if output_dir and not os.path.exists(output_dir):
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
try:
|
||
with open(filename, 'w', encoding=encoding) as f:
|
||
f.write(content)
|
||
logging.info(f"成功写入文件:{filename}")
|
||
except PermissionError:
|
||
raise PermissionError(f"没有写入文件 '{filename}' 的权限")
|
||
except OSError as e:
|
||
raise OSError(f"写入文件 '{filename}' 时发生错误:{e}")
|
||
|
||
|
||
def setup_argument_parser() -> argparse.ArgumentParser:
|
||
"""设置命令行参数解析器"""
|
||
parser = argparse.ArgumentParser(
|
||
description='文本句子字符交换处理器',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
使用示例:
|
||
%(prog)s -f input.txt # 处理文件
|
||
%(prog)s -t "你的文本内容" # 直接处理文本
|
||
%(prog)s -f input.txt -l 20 # 设置长度阈值为20
|
||
%(prog)s -f input.txt -o output.txt # 输出到文件
|
||
%(prog)s -f input.txt -p "。!?" -s # 自定义标点符号并显示统计
|
||
"""
|
||
)
|
||
|
||
# 输入选项
|
||
input_group = parser.add_mutually_exclusive_group(required=True)
|
||
input_group.add_argument('-f', '--file', help='输入文件路径')
|
||
input_group.add_argument('-t', '--text', help='直接输入文本')
|
||
input_group.add_argument('--stdin', action='store_true',
|
||
help='从标准输入读取文本')
|
||
|
||
# 处理选项
|
||
parser.add_argument('-l', '--length', type=int, default=30,
|
||
help='句子长度阈值(默认30)')
|
||
parser.add_argument('-p', '--punctuation',
|
||
help='自定义标点符号(默认:。!?;?!;)')
|
||
parser.add_argument('-o', '--output', help='输出文件路径')
|
||
parser.add_argument('-e', '--encoding', default='utf-8',
|
||
help='输出文件编码(默认utf-8)')
|
||
|
||
# 其他选项
|
||
parser.add_argument('-s', '--statistics', action='store_true',
|
||
help='显示处理统计信息')
|
||
parser.add_argument('-v', '--verbose', action='store_true',
|
||
help='显示详细日志')
|
||
parser.add_argument('--seed', type=int, help='随机数种子(用于测试)')
|
||
|
||
return parser
|
||
|
||
|
||
def main():
|
||
"""主函数:处理命令行参数和文本处理"""
|
||
parser = setup_argument_parser()
|
||
args = parser.parse_args()
|
||
|
||
# 设置日志级别
|
||
if args.verbose:
|
||
logging.getLogger().setLevel(logging.DEBUG)
|
||
|
||
# 设置随机数种子(用于测试)
|
||
if args.seed:
|
||
random.seed(args.seed)
|
||
|
||
# 获取输入文本
|
||
try:
|
||
if args.file:
|
||
text = FileHandler.read_file(args.file)
|
||
elif args.text:
|
||
text = args.text
|
||
elif args.stdin:
|
||
text = sys.stdin.read()
|
||
else:
|
||
print("错误:请指定输入源")
|
||
sys.exit(1)
|
||
|
||
if not text.strip():
|
||
print("警告:输入文本为空")
|
||
sys.exit(0)
|
||
|
||
except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
|
||
print(f"错误:{e}")
|
||
sys.exit(1)
|
||
|
||
# 创建处理器并处理文本
|
||
try:
|
||
processor = TextProcessor(
|
||
min_length=args.length,
|
||
custom_punctuation=args.punctuation
|
||
)
|
||
|
||
processed_text = processor.process_text(text)
|
||
|
||
# 输出结果
|
||
if args.output:
|
||
FileHandler.write_file(args.output, processed_text, args.encoding)
|
||
print(f"处理完成,结果已保存到 '{args.output}'")
|
||
else:
|
||
print("处理结果:")
|
||
print("-" * 50)
|
||
print(processed_text)
|
||
|
||
# 显示统计信息
|
||
if args.statistics:
|
||
processor.print_statistics()
|
||
|
||
except Exception as e:
|
||
print(f"处理过程中发生错误:{e}")
|
||
if args.verbose:
|
||
import traceback
|
||
traceback.print_exc()
|
||
sys.exit(1)
|
||
|
||
|
||
# 单元测试
|
||
def run_tests():
|
||
"""运行基本的单元测试"""
|
||
print("运行单元测试...")
|
||
|
||
# 测试句子拆分
|
||
processor = TextProcessor(min_length=6)
|
||
|
||
# 测试1:普通句子拆分
|
||
test_text = "这是第一句。这是第二句!第三句?"
|
||
sentences = processor.split_sentences(test_text)
|
||
assert len(sentences) == 3, f"期望3个句子,实际{len(sentences)}个"
|
||
assert sentences[0] == ("这是第一句", "。"), f"第一句解析错误:{sentences[0]}"
|
||
|
||
# 测试2:相邻字符交换
|
||
long_sentence = "这是一个很长的句子用来测试字符交换功能"
|
||
random.seed(42) # 固定种子以便测试
|
||
result = processor.swap_random_chars(long_sentence)
|
||
assert result != long_sentence, "长句子应该被修改"
|
||
assert len(result) == len(long_sentence), "交换后长度应该不变"
|
||
|
||
# 验证只交换了相邻的两个字符
|
||
diff_count = sum(1 for i, (a, b) in enumerate(zip(long_sentence, result)) if a != b)
|
||
assert diff_count == 2, f"应该只有2个字符位置发生变化,实际{diff_count}个"
|
||
|
||
# 测试3:短句子不变
|
||
short_sentence = "短句"
|
||
result = processor.swap_random_chars(short_sentence)
|
||
assert result == short_sentence, "短句子不应该被修改"
|
||
|
||
# 测试4:边界情况
|
||
empty_result = processor.swap_random_chars("")
|
||
assert empty_result == "", "空字符串应该保持不变"
|
||
|
||
print("✓ 所有测试通过!")
|
||
|
||
|
||
# 示例使用
|
||
def replace_text(text):
|
||
# 检查是否运行测试
|
||
if len(sys.argv) > 1 and sys.argv[1] == 'test':
|
||
run_tests()
|
||
sys.exit(0)
|
||
|
||
# 实际的文本处理逻辑
|
||
processor = TextProcessor(min_length=30)
|
||
return processor.process_text(text)
|
||
|
||
if __name__ == "__main__":
|
||
# 命令行模式
|
||
if len(sys.argv) > 1:
|
||
main()
|
||
else:
|
||
# 示例演示
|
||
sample_text = """阅读此文之前,麻烦您点击一下"关注",既方便您进行讨论和分享,又能给您带来不一样的参与感,创作不易,感谢您的支持。
|
||
|
||
曾经"半路出家",如今黯然无声,他的故事值得一品"""
|
||
|
||
print("示例演示:")
|
||
print("原文:")
|
||
print(sample_text)
|
||
print("\n" + "=" * 50 + "\n")
|
||
min_length = 12
|
||
processor = TextProcessor(min_length)
|
||
processed = processor.process_text(sample_text)
|
||
print("处理后:")
|
||
print(processed)
|
||
|
||
processor.print_statistics()
|
||
|
||
|
||
print("\n使用说明:")
|
||
print("命令行用法:")
|
||
print(" python script.py -f input.txt # 处理文件")
|
||
print(" python script.py -t '你的文本内容' # 直接处理文本")
|
||
print(" python script.py -f input.txt -l 20 # 设置长度阈值为20")
|
||
print(" python script.py -f input.txt -o output.txt # 输出到文件")
|
||
print(" python script.py -f input.txt -p '。!?' -s # 自定义标点符号并显示统计")
|
||
print(" python script.py test # 运行单元测试")
|
||
|
||
|
||
|
||
text = """阅读此文之前,麻烦您点击一下“关注”,既方便您进行讨论和分享,又能给您带来不一样的参与感,创作不易,感谢您的支持。
|
||
|
||
曾经“半路出家”,如今黯然无声,他的故事值得一品
|
||
说起央视的主持人,大家第一反应肯定是一个个字正腔圆、形象出彩的脸孔。可是在这其中,有一位却用浓厚的潮汕口音,还有点“油滑”的幽默,自成一派。他就是阿丘。
|
||
|
||
这个名字,可能在现在已经鲜有人提起,但在过去,他可是实打实的“名嘴”。不过,咱来说点耐人寻味的,他是怎么走到央视巅峰,又怎么“高台跳水”的?这故事,够扎心,更够意味深长。
|
||
|
||
看似格格不入,却杀出重围
|
||
熟悉阿丘的人,一听他那口音,就知道这是“岭南口音模块”的标配。他是个土生土长的广东人,也因为家里是军人家庭,小时候经常搬家,成了个活脱脱的语言天才,学会了好几个地方的方言。什么潮汕话、粤语、客家话,信手拈来。不得不说,小时候到处跑打下的基础,倒给他多了一点和别人不一样的“人味儿”。
|
||
|
||
他大学学的专业,可和主持半毛钱关系没有,是经济学。毕业后,他分配到了南宁的棉纺印染厂,待遇不错,是个政工干部。这时候的阿丘,怎么看都是个稳稳的职场小白,可谁能想到,后来的他能走上舞台呢?
|
||
|
||
90年代,相声、小品各类幽默比赛风靡全国。阿丘平时最爱的就是琢磨这些妙语段子,一心觉得自己是个“未被发现的宝藏男孩”。机会来了,1992年,他参加广西举办的笑星大赛,居然拿了个一等奖。这下可出名了,厂里人都认识他,他本人也成了“地方笑星”。
|
||
|
||
再后来,他调到了广西电视台,开始主持节目。头几年波澜不惊,直到他参加《南北笑星火辣辣》,凭借风趣和机灵吸引了更多目光。2003年,这个来自地方台的主持人,直接杀进了央视主持圈。靠什么?靠他的个性和风格。
|
||
|
||
从风光无限到画风突变
|
||
阿丘进入央视后,主持了好几档节目。他的幽默和接地气,与当时一板一眼的正规主持人大不相同。因此,他迅速被贴上“个性主持”的标签。尤其是在《社会记录》里,那带点潮汕腔调的问句,竟成了一种标志。
|
||
|
||
可惜,说话爽快的他,也因为“不当言论”栽了跟头。事情发生在2020年,正是全国上下齐心合力抗击疫情的时候。阿丘不知道怎么回事,在自己的博客里发了一些让人难以接受的言论。里头什么“东亚病夫”“道歉”显得格外刺眼。
|
||
|
||
不得不说,这一锅凉水泼得够彻底。网友立刻开始深挖,一挖还真揭出不少黑历史。有人爆料,他婚内包养女大学生,还试图给实习机会。虽然阿丘本人否认得七七八八,但这些传闻和再度破裂的婚姻,难免让人联想。
|
||
|
||
面对铺天盖地的指责,阿丘的态度是硬得离谱,一句道歉都没有。“嘴皮子”在这时候完全失灵了。要说在镜头前笑侃万事的大叔,这一次是真没能站住脚。
|
||
|
||
离开央视后的低调生活
|
||
最后,阿丘与央视长达12年的缘分彻底告一段落。此后的日子,他也算是从公众视线中消失了。最让人记得的,是他两年后现身老搭档张泉灵的节目,只是,这一次,他的亮相显得缥缈又散淡。
|
||
|
||
如今阿丘的身份,更多转向了自媒体。开了个叫“阿丘观山”的账号,做起旅游文化博主。视频里,他介绍名山大川,什么五台山、武当山,天天讲人生感悟。这画风,和过去主持访谈节目的他,可真是差太远了。
|
||
|
||
不少老观众打开他的账号,可能都得感叹一声“物是人非”。更有网友直言,他的语气里听到了些许“悔意”,又觉得是假装云淡风轻,实际还是难以摆脱舆论的阴影。
|
||
|
||
留下的启示和争议
|
||
阿丘的故事,是难得一见的。从地方电视台到央视舞台,他用12年时间登上顶峰,却因为12个字毁了前程。这起伏,真像一出大戏。
|
||
|
||
咱们反思一下,也许有些人,天赋、机遇都抓得很精准,但言行失当,永远是会砸场子的导火索。阿丘的人生轨迹,正说明了这一点。
|
||
|
||
现在问题来了,大家怎么看阿丘这个人?你是觉得他个性可惜,还是自毁前程?
|
||
|
||
欢迎留言讨论,你们的每一次互动,都是创作的动力。"""
|
||
|
||
|