修改获取网页内容代码

This commit is contained in:
taiyi 2025-10-25 16:45:02 +08:00
parent c9386084e8
commit c6cafb6998
11 changed files with 3209 additions and 0 deletions

1579
ArticleReplace.py Normal file

File diff suppressed because it is too large Load Diff

173
ai_studio.py Normal file
View File

@ -0,0 +1,173 @@
import json
import requests
from config import *
# ==========================调用dify工作流===============================================
def call_dify_workflow(input_data):
"""
调用Dify工作流的函数
:param input_data: 传递给工作流的输入数据
:return: 工作流的输出结果
"""
logger.info("Dify开始工作。。。")
api_key = CONFIG['Dify']['api_key']
user_id = CONFIG['Dify']['user_id']
url = CONFIG['Dify']['url']
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
}
data = {
"inputs": input_data,
"response_mode": "blocking",
"user": user_id
}
response = requests.post(url, headers=headers, data=json.dumps(data))
json_data = json.loads(response.text)
print("json_data:", json_data)
# 获取article的值
article = json_data['data']['outputs']['article']
# print("article:", article)
return article
# ==========================调用coze工作流==========================
def call_coze_workflow(parameters):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数字典格式
:return: 工作流的执行结果
"""
logger.info("Coze开始工作。。。。")
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
return response.text
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}
def call_coze_article_workflow(parameters):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数字典格式
:param is_async: 是否异步执行默认 False
:return: 工作流的执行结果
"""
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
import ast
# 直接解析整个result字符串
result_dict = ast.literal_eval(response.text)
# 解析data字段
data_dict = ast.literal_eval(result_dict['data'])
# 获取output的值
output_value = data_dict['output']
return output_value
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}
def call_coze_all_article_workflow(parameters,is_async=False):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数字典格式
:param is_async: 是否异步执行默认 False
:return: 工作流的执行结果
"""
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'False'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
import ast
# 直接解析整个result字符串
result_dict = ast.literal_eval(response.text)
print(result_dict)
# 解析data字段
data_dict = ast.literal_eval(result_dict['data'])
# 获取output的值
title = data_dict['title']
article = data_dict['article']
return title, article
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}

121
config.py Normal file
View File

@ -0,0 +1,121 @@
import configparser
import getpass
import logging
import os
# 配置文件路径
CONFIG_FILE = "config.ini"
# 默认配置
DEFAULT_CONFIG = {
"General": {
"chrome_user_dir": f"C:\\Users\\{getpass.getuser()}\\AppData\\Local\\Google\\Chrome\\User Data",
"articles_path": "articles",
"images_path": "picture",
"title_file": "文章链接.xlsx",
"max_threads": "3"
},
"Coze": {
"workflow_id": "",
"access_token": "",
"is_async": "false",
"input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}",
"last_used_template": "",
"last_used_template_type": "文章"
},
"Database": {
"host": "27.106.125.150",
"user": "root",
"password": "taiyi.1224",
"database": "toutiao"
},
"Dify": {
"api_key": "app-87gssUKFBs9BwJw4m95uUcyF",
"user_id": "toutiao",
"url": "http://27.106.125.150/v1/workflows/run"
},
"Baidu": {
"api_key": "",
"secret_key": ""
},
"ImageModify": {
"crop_percent": "0.02",
"min_rotation": "0.3",
"max_rotation": "3.0",
"min_brightness": "0.8",
"max_brightness": "1.2",
"watermark_text": "Qin Quan Shan Chu",
"watermark_opacity": "128",
"overlay_opacity": "30"
},
"Keywords": {
"banned_words": "珠海,落马,股票,股市,股民,爆炸,火灾,死亡,抢劫,诈骗,习大大,习近平,政府,官员,扫黑,警察,落网,嫌疑人,通报,暴力执法,执法,暴力,气象,天气,暴雨,大雨"
}
}
# 加载配置
def load_config():
config = configparser.ConfigParser()
# 如果配置文件不存在,创建默认配置
if not os.path.exists(CONFIG_FILE):
for section, options in DEFAULT_CONFIG.items():
config[section] = options
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
config.write(f)
else:
config.read(CONFIG_FILE, encoding='utf-8')
# 检查并添加缺失的配置项
for section, options in DEFAULT_CONFIG.items():
if not config.has_section(section):
config[section] = {}
for option, value in options.items():
if not config.has_option(section, option):
config[section][option] = value
# 保存更新后的配置
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
config.write(f)
return config
# 保存配置
def save_config(config):
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
config.write(f)
# 加载配置
CONFIG = load_config()
# 更新全局变量
USER_DIR_PATH = CONFIG['General']['chrome_user_dir']
ARTICLES_BASE_PATH = CONFIG['General']['articles_path']
IMGS_BASE_PATH = CONFIG['General']['images_path']
TITLE_BASE_PATH = CONFIG['General']['title_file']
MAX_THREADS = int(CONFIG['General']['max_threads'])
# 创建必要的目录
if not os.path.exists(ARTICLES_BASE_PATH):
os.makedirs(ARTICLES_BASE_PATH)
os.chmod(ARTICLES_BASE_PATH, 0o777)
if not os.path.exists(IMGS_BASE_PATH):
os.makedirs(IMGS_BASE_PATH)
os.chmod(IMGS_BASE_PATH, 0o777)
# 日志配置
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("article_replace.log", encoding='utf-8'),
logging.StreamHandler()
])
logger = logging.getLogger(__name__)
# 日志文件保存路径
LOG_FILE = "article_replace.log"

86
databases.py Normal file
View File

@ -0,0 +1,86 @@
import pymysql
# ==============================数据库模块===================================
def check_link_exists(host, user, password, database, link):
"""
检查指定的 link 是否存在于 MySQL 数据库表中如果不存在则插入该链接
:param host: MySQL 数据库主机地址
:param user: MySQL 用户名
:param password: MySQL 密码
:param database: 数据库名称
:param link: 需要检查的链接
:return: 如果链接存在返回 True如果链接不存在且插入成功返回 False
"""
connection = None # 确保 connection 被初始化
try:
# 连接到 MySQL 数据库
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database
)
with connection.cursor() as cursor:
# 查询链接是否存在
cursor.execute("SELECT 1 FROM links WHERE link = %s", (link,))
result = cursor.fetchone()
# 如果链接存在
if result:
return True
else:
return False
except pymysql.MySQLError as e:
print(f"数据库错误: {e}")
return False
finally:
# 确保在结束时关闭连接
if connection:
connection.close()
def check_link_insert(host, user, password, database, link):
"""
检查指定的 link 是否存在于 MySQL 数据库表中如果不存在则插入该链接
:param host: MySQL 数据库主机地址
:param user: MySQL 用户名
:param password: MySQL 密码
:param database: 数据库名称
:param link: 需要检查的链接
:return: 如果链接存在返回 True如果链接不存在且插入成功返回 False
"""
connection = None # 确保 connection 被初始化
try:
# 连接到 MySQL 数据库
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database
)
with connection.cursor() as cursor:
# 查询链接是否存在
cursor.execute("SELECT 1 FROM links WHERE link = %s", (link,))
result = cursor.fetchone()
if result:
# 如果链接已经存在,返回 True
return True
else:
# 插入链接
cursor.execute("INSERT INTO links (link) VALUES (%s)", (link,))
connection.commit() # 提交事务
print("链接已插入")
return False
except pymysql.MySQLError as e:
print(f"数据库错误: {e}")
return False
finally:
# 确保在结束时关闭连接
if connection:
connection.close()

419
get_web_content.py Normal file
View File

@ -0,0 +1,419 @@
from bs4 import BeautifulSoup
import time
import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests
def extract_images_from_html(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
# 匹配所有以 https://p3-sign.toutiaoi 开头的图片链接
img_tags = soup.find_all('img')
img_urls = []
for img in img_tags:
for attr in ['src', 'data-src']:
url = img.get(attr)
if url and url.startswith("https://p3-sign.toutiaoimg.com/tos-cn-i"):
img_urls.append(url)
# 去重处理
img_urls = list(dict.fromkeys(img_urls))
# 返回 JSON 格式
return {"image": img_urls}
# ============================================================
def get_webpage_source(url):
"""
获取网页源代码的通用函数
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
try:
# 添加随机延迟,模拟人类行为
time.sleep(random.uniform(1, 3))
response = requests.get(url, headers=headers, timeout=10)
response.encoding = 'utf-8'
# 检查响应状态
if response.status_code == 200:
return response.text
else:
print(f"请求失败,状态码: {response.status_code}")
return None
except Exception as e:
print(f"获取网页源代码时出错: {e}")
return None
# def get_webpage_source_selenium(url):
# """
# 使用Selenium获取网页源代码适用于动态加载内容的网站
# """
# # 配置Chrome选项
# chrome_options = Options()
# chrome_options.add_argument('--headless') # 无头模式
# chrome_options.add_argument('--disable-gpu')
# chrome_options.add_argument('--no-sandbox')
# chrome_options.add_argument('--disable-dev-shm-usage')
# chrome_options.add_argument('--disable-blink-features=AutomationControlled')
# chrome_options.add_argument(
# 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
#
# # 初始化WebDriver
# driver = webdriver.Chrome(options=chrome_options)
#
# try:
# # 访问URL
# driver.get(url)
#
# # 等待页面加载完成(可根据实际情况调整等待条件)
# time.sleep(3) # 简单等待3秒
#
# # 尝试等待文章内容加载
# try:
# WebDriverWait(driver, 10).until(
# EC.presence_of_element_located((By.TAG_NAME, "article"))
# )
# except:
# print("等待文章元素超时,将使用当前页面内容")
#
# # 获取页面源代码
# page_source = driver.page_source
#
# # 保存源代码到文件
# with open("toutiao_source_selenium.html", "w", encoding="utf-8") as f:
# f.write(page_source)
#
# return page_source
# except Exception as e:
# print(f"使用Selenium获取网页源代码时出错: {e}")
# return None
# finally:
# # 关闭浏览器
# driver.quit()
# =====================采集内容内容==================================
# def toutiao_w_extract_content(url):
# """
# 使用requests和BeautifulSoup提取头条页面内容
# """
# html_content = get_webpage_source_selenium(url)
#
# # 使用BeautifulSoup解析HTML
# soup = BeautifulSoup(html_content, 'html.parser')
#
# # 提取标题和文章内容
# article_element = soup.select_one('article')
#
# if not article_element:
# # 尝试其他可能的选择器
# article_element = soup.select_one('.article-content') or soup.select_one('.content')
#
# title_element = soup.select_one('h1') or soup.select_one('.article-title')
# title_text = title_element.get_text().strip() if title_element else ""
# article_text = article_element.get_text().strip() if article_element else ""
#
# # 提取图片URL
# img_elements = article_element.select('img') if article_element else []
# img_urls = [img.get('src') for img in img_elements if img.get('src')]
#
# return title_text, article_text, img_urls
def toutiao_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1'
article_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article'
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
# img_selector = "#root > div.article-detail-container > div.main > div.show-monitor article img"
# img_elements = soup.select(img_selector)
# # img_elements = article_element.select('img') if article_element else []
img_urls = extract_images_from_html(html_content)['image']
# img_urls = [img.get('src') for img in img_elements if img.get('src').startswith("https://p3")]
return title_text, article_text, img_urls
def wechat_extract_content(url):
"""
使用requests和BeautifulSoup提取微信公众号页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 使用指定的选择器提取标题和文章内容
title_element = soup.select_one('#activity-name')
article_element = soup.select_one('#js_content')
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取特定 section 中的图片 URL仅保留以 https://mmbiz.qpic.cn 开头的)
img_elements = article_element.select('img') if article_element else []
img_urls = []
for img in img_elements:
src = img.get('src') or img.get('data-src')
if src and src.startswith('https://mmbiz.qpic.cn'):
img_urls.append(src)
return title_text, article_text, img_urls
def wangyi_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#contain > div.post_main > h1'
article_selector = '#content > div.post_body'
# img_selector = "#content > div.post_body > p > img"
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
img_selector = "#content > div.post_body > p > img"
img_elements = soup.select(img_selector)
img_elements = article_element.select('img') if article_element else []
# img_urls = extract_images_from_html(html_content)['image']
img_urls = [img.get('src') for img in img_elements if img.get('src')]
return title_text, article_text, img_urls
def souhu_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
print(soup)
# 提取标题和文章内容
title_selector = '#article-container > div.left.main > div:nth-child(1) > div > div.text-title > h1'
article_selector = '#mp-editor'
# img_selector = "#content > div.post_body > p > img"
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
# img_selector = "#mp-editor > p > img"
# img_elements = soup.select(img_selector)
img_elements = article_element.select('img') if article_element else []
img_urls = [img.get('src') for img in img_elements if img.get('src')]
return title_text, article_text, img_urls
def toutiao_w_extract_content(url):
"""
优化后的头条页面内容提取函数
专门获取文章内容中的图片链接
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 多种标题选择器,按优先级尝试
title_selectors = [
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1',
'h1.article-title',
'h1[data-testid="headline"]',
'.article-title h1',
'.article-header h1',
'article h1',
'h1'
]
title_text = ""
for selector in title_selectors:
title_element = soup.select_one(selector)
if title_element:
title_text = title_element.get_text().strip()
break
# 多种文章内容选择器,按优先级尝试
article_selectors = [
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article',
'article',
'.article-content',
'.content',
'#js_content',
'.post_body',
'[data-testid="article-content"]'
]
article_text = ""
article_element = None
for selector in article_selectors:
article_element = soup.select_one(selector)
if article_element:
article_text = article_element.get_text().strip()
break
# 只从文章内容中提取图片
img_urls = []
if article_element:
# 查找文章内容中的所有图片元素
img_elements = article_element.find_all('img')
for img in img_elements:
# 尝试多种可能的图片URL属性
for attr in ['src', 'data-src', 'data-original', 'data-lazy-src']:
url = img.get(attr)
if url:
# 处理相对路径
if url.startswith('//'):
url = 'https:' + url
elif url.startswith('/'):
url = 'https://www.toutiao.com' + url
# 只收集头条相关的图片URL
if any(domain in url for domain in ['toutiaoimg.com', 'p3-sign.toutiaoimg.com', 'byteimg.com']):
img_urls.append(url)
break # 找到一个有效URL就跳出内层循环
# 如果上面没有找到图片尝试使用现有的extract_images_from_html函数作为备选
if not img_urls:
extracted_imgs = extract_images_from_html(html_content)
if extracted_imgs and 'image' in extracted_imgs:
img_urls = extracted_imgs['image']
# 去重处理
img_urls = list(dict.fromkeys(img_urls))
return title_text, article_text, img_urls
def get_webpage_source_selenium(url):
"""
增强版的Selenium获取网页源代码函数
专门针对头条网站的动态加载特性进行优化
"""
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_argument('--disable-images') # 禁用图片加载以提高速度
chrome_options.add_argument('--disable-javascript') # 如果不需要JS可以禁用
chrome_options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
driver = webdriver.Chrome(options=chrome_options)
try:
driver.get(url)
# 等待页面加载完成
time.sleep(5)
# 尝试等待关键元素加载
wait = WebDriverWait(driver, 15)
try:
# 等待文章标题加载
wait.until(EC.presence_of_element_located((By.TAG_NAME, "h1")))
# 等待文章内容加载
wait.until(EC.presence_of_element_located((By.TAG_NAME, "article")))
except:
print("等待关键元素超时,使用当前页面内容")
# 滚动页面以触发懒加载
driver.execute_script("window.scrollTo(0, document.body.scrollHeight/2);")
time.sleep(2)
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
driver.execute_script("window.scrollTo(0, 0);")
time.sleep(1)
page_source = driver.page_source
# # 保存源代码用于调试
# with open("toutiao_source_enhanced.html", "w", encoding="utf-8") as f:
# f.write(page_source)
return page_source
except Exception as e:
print(f"使用增强版Selenium获取网页源代码时出错: {e}")
return None
finally:
driver.quit()

341
images_edit.py Normal file
View File

@ -0,0 +1,341 @@
import logging
import os
import random
import requests
from PIL import Image
from PIL import ImageDraw, ImageFont, ImageEnhance
from config import *
from utils import safe_open_directory, safe_filename
IMGS_BASE_PATH = CONFIG['General']['images_path']
def crop_and_replace_images(folder_path):
"""
修改图片尺寸
:param folder_path:
:return:
"""
print("开始处理图片。。。。")
# 遍历文件夹中的所有文件
for filename in os.listdir(folder_path):
# 检查文件扩展名是否为图片格式
if filename.lower().endswith(('.jpg')):
# 拼接完整的文件路径
file_path = os.path.join(folder_path, filename)
print("文件夹路径:" + folder_path)
print("文件路径:" + file_path)
# 打开图片
with Image.open(file_path) as img:
# 获取图片的尺寸
width, height = img.size
# 裁剪图片裁剪下方10px
print("裁剪图片。。。")
cropped_img = img.crop((0, 0, width, height - (height * 0.1)))
# 保存裁剪后的图片,覆盖原文件
# 通过拉伸使改变裁剪后图片的尺寸与原图片尺寸相同
resized_img = cropped_img.resize((width, height))
# output_path = file_path[0:file_path.find('.')] + '.png'
resized_img.save(file_path, 'jpg')
def deduplicate_images(folder_path):
print("开始对图片去重。。。")
"""扫描 folder_path 下的图片,对每张图片做修改并直接覆盖原文件"""
if not os.path.exists(folder_path):
print("错误:输入文件夹不存在!")
return
supported_ext = ('.png', '.jpg', '.jpeg', '.bmp', '.gif', '.webp')
for root, _, files in os.walk(folder_path):
for file in files:
if file.lower().endswith(supported_ext):
file_path = os.path.join(root, file)
try:
with Image.open(file_path) as img:
modified_img = modify_image(img)
modified_img.save(file_path) # 直接覆盖原图片
print(f"已处理并覆盖:{file_path}")
except Exception as e:
print(f"处理 {file_path} 时出错:{e}")
def download_image(image_url, save_path):
"""
下载图片并保存
:param image_url: 图片链接
:param save_path: 保存路径
:return:
"""
try:
response = requests.get(image_url)
if response.status_code == 200:
with open(save_path, 'wb') as f:
f.write(response.content)
print(f"图片下载成功,保存路径为:{save_path}")
else:
print(f"图片下载失败,状态码为:{response.status_code}")
except requests.exceptions.RequestException as e:
print(f"请求出错:{e}")
def download_and_process_images(img_urls, article_title, save_dir=None):
"""
下载并处理图片
:param img_urls: 图片URL列表
:param article_title: 文章标题
:param save_dir: 自定义保存目录如果为None则使用默认目录
"""
if save_dir is None:
save_dir = IMGS_BASE_PATH
# 使用safe_filename处理文章标题
safe_title = safe_filename(article_title)
# 使用os.path.normpath来规范化路径避免路径分隔符的问题
img_dir_path = os.path.normpath(os.path.join(str(save_dir), safe_title))
logger.info(f"图片保存路径:{img_dir_path}")
safe_open_directory(img_dir_path)
for i, img_url in enumerate(img_urls):
if img_url.startswith("https"):
imgurl = img_url
else:
imgurl = "https:" + img_url
# 使用os.path.normpath来规范化图片路径
img_path = os.path.normpath(os.path.join(img_dir_path, f"图片{i}.jpg"))
try:
download_image(imgurl, img_path)
# 只处理当前下载的图片,而不是整个文件夹
with Image.open(img_path) as img:
modified_img = modify_image(img)
modified_img.save(img_path) # 直接覆盖原图片
print(f"已处理并覆盖:{img_path}")
except Exception as e:
logging.error(f"处理图片失败: {e}")
# def download_and_process_images(img_urls, article_title, save_dir=None):
# """
# 下载并处理图片
# :param img_urls: 图片URL列表
# :param article_title: 文章标题
# :param save_dir: 自定义保存目录如果为None则使用默认目录
# """
# if save_dir is None:
# save_dir = IMGS_BASE_PATH
#
# img_dir_path = os.path.join(str(save_dir), str(article_title))
# logger.info(f"图片保存路径:{img_dir_path}")
# safe_open_directory(img_dir_path)
#
# for i, img_url in enumerate(img_urls):
# if img_url.startswith("https"):
# imgurl = img_url
# else:
# imgurl = "https:"+img_url
# img_path = os.path.join(img_dir_path, f"图片{i}.jpg")
# try:
# download_image(imgurl, img_path)
# # crop_and_replace_images(img_dir_path)
# deduplicate_images(img_dir_path)
# except Exception as e:
# logging.error(f"处理图片失败: {e}")
# def modify_image(img):
# print("修改图片")
# """对图片应用去重处理,不翻转,仅裁剪、旋转、亮度调整、添加水印、加透明蒙版"""
# width, height = img.size
#
# # 从配置中获取参数
# crop_percent = float(CONFIG['ImageModify']['crop_percent'])
# min_rotation = float(CONFIG['ImageModify']['min_rotation'])
# max_rotation = float(CONFIG['ImageModify']['max_rotation'])
# min_brightness = float(CONFIG['ImageModify']['min_brightness'])
# max_brightness = float(CONFIG['ImageModify']['max_brightness'])
# watermark_text = CONFIG['ImageModify']['watermark_text']
# watermark_opacity = int(CONFIG['ImageModify']['watermark_opacity'])
# overlay_opacity = int(CONFIG['ImageModify']['overlay_opacity'])
#
# # 1. 裁剪边缘
# crop_px_w = int(width * crop_percent)
# crop_px_h = int(height * crop_percent)
# img = img.crop((crop_px_w, crop_px_h, width - crop_px_w, height - crop_px_h))
#
# # 2. 随机旋转
# angle = random.uniform(min_rotation, max_rotation) * random.choice([-1, 1])
# img = img.rotate(angle, expand=True)
#
# # 3. 调整亮度
# enhancer = ImageEnhance.Brightness(img)
# factor = random.uniform(min_brightness, max_brightness) # 亮度调整因子
# img = enhancer.enhance(factor)
#
# # 4. 添加文字水印
# draw = ImageDraw.Draw(img)
# font_size = max(20, int(min(img.size) * 0.05))
# try:
# font = ImageFont.truetype("arial.ttf", font_size)
# except:
# font = ImageFont.load_default()
#
# # 获取文本尺寸
# text_width, text_height = draw.textbbox((0, 0), watermark_text, font=font)[2:]
#
# # 水印放在图片右下角
# x = img.size[0] - text_width - 5
# y = img.size[1] - text_height - 5
# draw.text((x, y), watermark_text, font=font, fill=(255, 255, 255, watermark_opacity))
#
# # 5. 添加半透明蒙版
# overlay = Image.new('RGBA', img.size, (255, 255, 255, overlay_opacity))
# if img.mode != 'RGBA':
# img = img.convert('RGBA')
# img = Image.alpha_composite(img, overlay)
#
# return img.convert('RGB')
def modify_image(img):
"""
对图片应用去重处理不翻转仅裁剪旋转亮度调整添加水印加透明蒙版
参数:
img: PIL.Image对象要处理的图片
返回:
PIL.Image对象处理后的图片
"""
print("修改图片")
# 确保图片是RGB模式
if img.mode != 'RGB':
img = img.convert('RGB')
# 从配置中获取参数
config = CONFIG['ImageModify']
crop_percent = float(config['crop_percent'])
min_rotation = float(config['min_rotation'])
max_rotation = float(config['max_rotation'])
min_brightness = float(config['min_brightness'])
max_brightness = float(config['max_brightness'])
watermark_text = config['watermark_text']
watermark_opacity = int(config['watermark_opacity'])
overlay_opacity = int(config['overlay_opacity'])
# 1. 新增功能裁剪图片下方20px
img = crop_bottom(img, 20)
# 2. 裁剪边缘
img = crop_edges(img, crop_percent)
# 3. 随机旋转
img = random_rotate(img, min_rotation, max_rotation)
# 4. 调整亮度
img = adjust_brightness(img, min_brightness, max_brightness)
# 5. 添加文字水印
img = add_watermark(img, watermark_text, watermark_opacity)
# 6. 添加半透明蒙版
img = add_overlay(img, overlay_opacity)
# 返回RGB模式的图片
return img.convert('RGB')
def crop_bottom(img, pixels):
"""
裁剪图片底部指定像素
参数:
img: PIL.Image对象要裁剪的图片
pixels: int要裁剪的像素数
返回:
PIL.Image对象裁剪后的图片
"""
width, height = img.size
if height > pixels: # 确保图片高度大于要裁剪的像素
return img.crop((0, 0, width, height - pixels))
return img
def crop_edges(img, percent):
"""
按比例裁剪图片边缘
参数:
img: PIL.Image对象要裁剪的图片
percent: float裁剪比例0-1之间
返回:
PIL.Image对象裁剪后的图片
"""
width, height = img.size
crop_px_w = int(width * percent)
crop_px_h = int(height * percent)
return img.crop((crop_px_w, crop_px_h, width - crop_px_w, height - crop_px_h))
def random_rotate(img, min_rotation, max_rotation):
"""
随机旋转图片
参数:
img: PIL.Image对象要旋转的图片
min_rotation: float最小旋转角度
max_rotation: float最大旋转角度
返回:
PIL.Image对象旋转后的图片
"""
angle = random.uniform(min_rotation, max_rotation) * random.choice([-1, 1])
return img.rotate(angle, expand=True)
def adjust_brightness(img, min_brightness, max_brightness):
"""
调整图片亮度
参数:
img: PIL.Image对象要调整亮度的图片
min_brightness: float最小亮度因子
max_brightness: float最大亮度因子
返回:
PIL.Image对象调整亮度后的图片
"""
enhancer = ImageEnhance.Brightness(img)
factor = random.uniform(min_brightness, max_brightness)
return enhancer.enhance(factor)
def add_watermark(img, text, opacity):
"""
添加文字水印到图片右下角
参数:
img: PIL.Image对象要添加水印的图片
text: str水印文本
opacity: int水印透明度0-255
返回:
PIL.Image对象添加水印后的图片
"""
# 确保图片是RGBA模式以支持透明度
if img.mode != 'RGBA':
img = img.convert('RGBA')
draw = ImageDraw.Draw(img)
font_size = max(20, int(min(img.size) * 0.05))
try:
font = ImageFont.truetype("arial.ttf", font_size)
except:
font = ImageFont.load_default()
# 获取文本尺寸
text_width, text_height = draw.textbbox((0, 0), text, font=font)[2:]
# 确保水印不超出图片边界
x = max(5, img.size[0] - text_width - 5)
y = max(5, img.size[1] - text_height - 5)
# 添加水印
draw.text((x, y), text, font=font, fill=(255, 255, 255, opacity))
return img
def add_overlay(img, opacity):
"""
添加半透明蒙版
参数:
img: PIL.Image对象要添加蒙版的图片
opacity: int蒙版透明度0-255
返回:
PIL.Image对象添加蒙版后的图片
"""
# 确保图片是RGBA模式以支持透明度
if img.mode != 'RGBA':
img = img.convert('RGBA')
overlay = Image.new('RGBA', img.size, (255, 255, 255, opacity))
return Image.alpha_composite(img, overlay)

263
main_process.py Normal file
View File

@ -0,0 +1,263 @@
import threading
import queue
import json # 导入 json 模块
from ai_studio import call_dify_workflow,call_coze_article_workflow,call_coze_all_article_workflow
from databases import *
from images_edit import download_and_process_images
from utils import *
from get_web_content import *
from config import *
# ==============================主程序===========================
def process_link(link_info, ai_service, current_template=None,generation_type=None):
link, article_type = link_info # 解包链接和类型信息
try:
if link.startswith("https://www.toutiao.com"):
title_text, article_text, img_urls = toutiao_w_extract_content(link)
if title_text == "":
title_text, article_text, img_urls = toutiao_extract_content(link)
elif link.startswith("https://mp.weixin.qq.co"):
title_text, article_text, img_urls = wechat_extract_content(link)
elif link.startswith("https://www.163.com"):
title_text, article_text, img_urls = wangyi_extract_content(link)
else:
title_text, article_text, img_urls = "", "", []
if title_text == "":
return
elif len(title_text) > 100:
return
# 获取数据库配置
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
# 判断文章内容是否有违禁词
check_keywords = check_keywords_in_text(title_text)
title = extract_content_until_punctuation(article_text).replace("正文:", "")
from datetime import datetime
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间
print("当前时间:", current_time)
if ai_service == "dify":
if check_keywords:
print("文章中有违禁词!")
check_link_insert(host, user, password, database, link)
return
input_data_template_str = CONFIG['Dify'].get('input_data_template', '{"old_article": "{article_text}"}')
try:
input_data_template = json.loads(input_data_template_str)
input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()}
except (json.JSONDecodeError, KeyError, AttributeError) as e:
logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.")
input_data = {"old_article": article_text}
message_content = call_dify_workflow(input_data)
elif ai_service == "coze":
logger.info("coze正在处理")
logger.info(f"正在处理的文章类型为:{generation_type}")
if current_template:
original_config = {
'workflow_id': CONFIG['Coze']['workflow_id'],
'access_token': CONFIG['Coze']['access_token'],
'is_async': CONFIG['Coze']['is_async']
}
CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '')
CONFIG['Coze']['access_token'] = current_template.get('access_token', '')
CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true')
logger.info(f"应用模板配置: {current_template.get('name')}")
logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}")
logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}")
logger.info(f"Is Async: {CONFIG['Coze']['is_async']}")
try:
input_data_template_str = CONFIG['Coze'].get('input_data_template')
input_data_template = json.loads(input_data_template_str)
if generation_type == "短篇":
input_data = {"article": article_text}
print("coze中输入", input_data)
message_content = call_coze_article_workflow(input_data)
elif generation_type == "文章":
print("原文中标题为:", title_text)
print("原文中内容为:", article_text)
input_data = {"title": title_text, "article": article_text}
print("发送的请求数据为:", input_data)
title, message_content = call_coze_all_article_workflow(input_data)
finally:
if 'original_config' in locals():
CONFIG['Coze'].update(original_config)
# 去除标题首尾的空格
title_text = title_text.strip()
# 创建类型目录
type_dir = os.path.join(ARTICLES_BASE_PATH, article_type)
safe_open_directory(type_dir)
# 在类型目录下保存文章
file_name = ""
if generation_type == '短篇':
file_name = handle_duplicate_files_advanced(type_dir, title_text.strip())[0]
elif generation_type == "文章":
file_name = handle_duplicate_files_advanced(type_dir, title.strip())[0]
article_save_path = os.path.join(type_dir, f"{file_name}.txt")
if "```" in message_content:
message_content = message_content.replace("``", "")
message_content = title + "\n" + message_content
# 判断文章合规度(根据配置决定是否启用)
enable_detection = CONFIG['Baidu'].get('enable_detection', 'false').lower() == 'true'
if enable_detection:
print("正在检测文章合规度")
if text_detection(message_content) == "合规":
print("文章合规")
pass
else:
print("文章不合规")
return
else:
print("违规检测已禁用,跳过检测")
with open(article_save_path, 'w', encoding='utf-8') as f:
f.write(message_content)
logging.info('文本已经保存')
if img_urls:
# 在类型目录下创建图片目录
type_picture_dir = os.path.join(IMGS_BASE_PATH, article_type)
safe_open_directory(type_picture_dir)
# 确保文件名没有多余空格
download_and_process_images(img_urls, file_name.strip(), type_picture_dir)
except Exception as e:
logging.error(f"处理链接 {link} 时出错: {e}")
raise
def link_to_text(num_threads=None, ai_service="dify", current_template=None, generation_type=None):
use_link_path = 'use_link_path.txt'
# 读取链接
links = read_excel(TITLE_BASE_PATH)
# 过滤已处理的链接
filtered_links = []
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
for link_info in links:
link = link_info[0].strip() # 获取链接并去除空白字符
# 如果Excel中有类型使用Excel中的类型否则使用传入的generation_type
article_type = link_info[1].strip() if len(link_info) > 1 and link_info[1].strip() else generation_type
logging.info(f"总共{len(links)}个链接")
# if check_link_exists(host, user, password, database, link):
# logger.info(f"链接已存在: {link}")
# continue
# else:
filtered_links.append((link, article_type)) # 保存链接和类型的元组
# logger.info(f"链接不存在: {link}")
# print("链接不存在,存储到过滤器中:", link)
if not filtered_links:
logger.info("没有新链接需要处理")
return []
# 使用多线程处理链接
results = process_links_with_threads(filtered_links, num_threads, ai_service, current_template,generation_type)
# 记录已处理的链接
with open(use_link_path, 'a+', encoding='utf-8') as f:
for link, success, _ in results:
if success:
f.write(link + "\n")
return results
# 创建一个任务队列和结果队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 工作线程函数
def worker(ai_service, current_template=None,generation_type=None):
while True:
try:
# 从队列中获取任务
link = task_queue.get()
if link is None: # 结束信号
break
# 处理链接
try:
logger.info(f"开始处理链接:{link}")
process_link(link, ai_service, current_template,generation_type)
result_queue.put((link, True, None)) # 成功
except Exception as e:
result_queue.put((link, False, str(e))) # 失败
logger.error(f"处理链接 {link} 时出错: {e}")
# 标记任务完成
task_queue.task_done()
except Exception as e:
logger.error(f"工作线程出错: {e}")
# 多线程处理链接
def process_links_with_threads(links, num_threads=None, ai_service="dify", current_template=None,generation_type=None):
if num_threads is None:
num_threads = min(MAX_THREADS, len(links))
else:
num_threads = min(num_threads, MAX_THREADS, len(links))
# 清空任务队列和结果队列
while not task_queue.empty():
task_queue.get()
while not result_queue.empty():
result_queue.get()
# 创建工作线程
threads = []
# 将AI服务选择和模板配置传递给worker函数
for _ in range(num_threads):
t = threading.Thread(target=worker, args=(ai_service, current_template,generation_type))
t.daemon = True
t.start()
threads.append(t)
# 添加任务到队列
for link in links:
task_queue.put(link)
# 添加结束信号
for _ in range(num_threads):
task_queue.put(None)
# 等待所有线程完成
for t in threads:
t.join()
# 处理结果
results = []
while not result_queue.empty():
results.append(result_queue.get())
return results

1
settings.json Normal file
View File

@ -0,0 +1 @@
{"folder1": "D:/work/python/ArticleReplaceBatch/articles/\u751f\u6d3b", "folder2": "D:/work/python/ArticleReplaceBatch/picture/\u751f\u6d3b", "keep_txt": true}

5
test.py Normal file
View File

@ -0,0 +1,5 @@
text = "```markdown你好的"
if "```markdown" in text:
text = text.replace("```markdown", "")
print(text)

51
use_link_path.txt Normal file
View File

@ -0,0 +1,51 @@
https://www.toutiao.com/item/7491909097776857615/
https://www.toutiao.com/item/7491942980174053888/
https://www.toutiao.com/item/7491968674203533863/
https://www.toutiao.com/item/7491961886021026340/
https://www.toutiao.com/item/7492270583044915746/
https://www.toutiao.com/item/7491930239560385065/
https://www.toutiao.com/item/7492298838103966220/
https://www.toutiao.com/item/7491909097776857615/
https://www.toutiao.com/item/7491942980174053888/
https://www.toutiao.com/item/7491968674203533863/
https://www.toutiao.com/item/7491961886021026340/
https://www.toutiao.com/item/7492270583044915746/
https://www.toutiao.com/item/7491930239560385065/
https://www.toutiao.com/item/7492298838103966220/
https://www.toutiao.com/item/7491909097776857615/
https://www.toutiao.com/item/7491942980174053888/
https://www.toutiao.com/item/7491968674203533863/
https://www.toutiao.com/item/7491961886021026340/
https://www.toutiao.com/item/7492270583044915746/
https://www.toutiao.com/item/7491930239560385065/
https://www.toutiao.com/item/7492298838103966220/
https://www.toutiao.com/item/7491942980174053888/
https://www.toutiao.com/item/7491968674203533863/
https://www.toutiao.com/item/7491930239560385065/
https://www.toutiao.com/item/7492298838103966220/
https://www.toutiao.com/item/7496315211876401690/
https://www.toutiao.com/item/7496315211876401690/
https://www.toutiao.com/item/7496315211876401690/
https://www.toutiao.com/item/7496284554789995048/
https://www.toutiao.com/item/7496084587592892969/
https://www.toutiao.com/item/7495928210375377460/
https://www.toutiao.com/item/7494707281880269324/
https://www.toutiao.com/item/7501188656259744290/
https://www.toutiao.com/item/7501188656259744290/
https://www.toutiao.com/item/7501513738202169919/
https://www.toutiao.com/item/7501459745153483301/
https://www.toutiao.com/item/7501513738202169919/
https://www.toutiao.com/item/7501459745153483301/
https://www.toutiao.com/item/7501188656259744290/
https://www.toutiao.com/item/7501188656259744290/
https://www.toutiao.com/item/7501513738202169919/
https://www.toutiao.com/item/7501459745153483301/
https://www.toutiao.com/item/7501188656259744290/
https://www.toutiao.com/item/7501513738202169919/
https://www.toutiao.com/item/7501459745153483301/
https://www.toutiao.com/item/7501188656259744290/
https://www.toutiao.com/item/7501513738202169919/
https://www.toutiao.com/item/7501459745153483301/
https://www.toutiao.com/item/7501188656259744290/
https://www.toutiao.com/item/7501513738202169919/
https://www.toutiao.com/item/7501459745153483301/

170
utils.py Normal file
View File

@ -0,0 +1,170 @@
import json
import re
import pandas as pd
import requests
from config import *
def text_detection(text):
"""
百度检验文字是否违规
:param text:
:return:
"""
url = "https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined?access_token=" + get_baidu_access_token()
payload = 'text=' + text
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
content = str(response.text)
data = json.loads(content)
print(data)
conclusion = data['conclusion']
return conclusion
def get_baidu_access_token():
"""
使用 AKSK 生成鉴权签名Access Token百度信息获取
:return: access_token或是None(如果错误)
"""
API_KEY = CONFIG['Baidu']['api_key']
SECRET_KEY = CONFIG['Baidu']['secret_key']
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}
return str(requests.post(url, params=params).json().get("access_token"))
def safe_filename(filename):
"""
处理文件名移除或替换不安全的字符
"""
# 替换Windows文件系统中不允许的字符
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
# 去除首尾空格和点
filename = filename.strip('. ')
# 如果文件名为空,使用默认名称
if not filename:
filename = 'untitled'
return filename
def safe_open_directory(directory_path):
"""
安全创建目录确保路径格式正确并创建所有必要的父目录
"""
try:
# 规范化路径
directory_path = os.path.normpath(directory_path)
if not os.path.exists(directory_path):
os.makedirs(directory_path, exist_ok=True)
os.chmod(directory_path, 0o777)
except Exception as e:
# 打印日志并保存到日志文件中
logging.error(f"创建目录失败: {e}")
raise
def check_keywords_in_text(text):
"""
检查文本中是否包含违禁词
:param text:
:return:
"""
keywords = CONFIG['Keywords']['banned_words'].split(',')
for keyword in keywords:
if keyword.strip() in text:
return True
return False
def extract_content_until_punctuation(text, punctuations=r'[,。!?;]'):
"""
截取一段话中从开始到最近的标点符号的内容
:param text: 输入的文本
:param punctuations: 标点符号的正则表达式模式默认为""""""""""
:return: 截取的内容
"""
# 使用正则表达式查找标点符号的位置
match = re.search(punctuations, text)
if match:
# 如果找到标点符号,截取从开始到标点符号之前的部分
return text[:match.end()].strip()
else:
# 如果没有找到标点符号,返回整个文本
return text.strip()
# 读取Excel表格链接列和类型列的内容并将内容以元组列表的形式返回
def read_excel(file_name):
datas = pd.read_excel(file_name)
first_column_name = datas.columns[0] # 链接列
type_column_name = '领域' # 类型列
links = datas[first_column_name].tolist()
# 如果存在类型列就读取,不存在则为默认类型
types = datas[type_column_name].tolist() if type_column_name in datas.columns else ['默认'] * len(links)
# 将链接和类型组合成元组列表
result = list(zip(links, types))
print(result)
return result
from typing import Tuple
def handle_duplicate_files_advanced(folder_path: str, filename: str) -> Tuple[str, bool]:
"""
增强版处理文件夹中的同名文件支持更复杂的场景
参数:
folder_path: 文件夹路径
filename: 原始文件名
返回:
Tuple[str, bool]: (处理后的文件名, 是否是重命名的)
"""
# 首先处理文件名中的非法字符
filename = safe_filename(filename)
base, ext = os.path.splitext(filename)
target_path = os.path.join(folder_path, filename)
if not os.path.exists(target_path):
return filename, False
existing_files = set(os.listdir(folder_path))
pattern = re.compile(r'^{}(_(\d+))?{}$'.format(re.escape(base), re.escape(ext)))
# 找出所有匹配的文件并提取数字
numbers = []
for f in existing_files:
match = pattern.match(f)
if match:
num = int(match.group(2)) if match.group(2) else 0
numbers.append(num)
next_num = max(numbers) + 1 if numbers else 1
new_filename = f"{base}_{next_num}{ext}"
# 确保新文件名也不存在(处理并发情况)
while new_filename in existing_files:
next_num += 1
new_filename = f"{base}_{next_num}{ext}"
return new_filename, True