Files
ArticleReplaceBatch/get_web_content.py
2026-03-25 15:17:18 +08:00

512 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from bs4 import BeautifulSoup
import time
import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
import os
import logging
def extract_images_from_html(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
# 匹配所有以 https://p3-sign.toutiaoi 开头的图片链接
img_tags = soup.find_all('img')
img_urls = []
for img in img_tags:
for attr in ['src', 'data-src']:
url = img.get(attr)
if url and url.startswith("https://p3-sign.toutiaoimg.com/tos-cn-i"):
img_urls.append(url)
# 去重处理
img_urls = list(dict.fromkeys(img_urls))
# 返回 JSON 格式
return {"image": img_urls}
# ============================================================
def get_webpage_source(url):
"""
获取网页源代码的通用函数
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
proxies = {
'http': None,
'https': None,
}
try:
# 添加随机延迟,模拟人类行为
time.sleep(random.uniform(1, 3))
response = requests.get(url, headers=headers, proxies=proxies, timeout=10)
response.encoding = 'utf-8'
# 检查响应状态
if response.status_code == 200:
return response.text
else:
print(f"请求失败,状态码: {response.status_code}")
return None
except Exception as e:
print(f"获取网页源代码时出错: {e}")
return None
def get_webpage_source_selenium(url, timeout=30):
"""
使用Selenium获取网页源代码适用于动态加载内容的网站
修复了资源泄漏和异常处理问题
"""
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from contextlib import contextmanager
@contextmanager
def get_driver():
"""使用上下文管理器确保WebDriver正确关闭"""
driver = None
service = None
try:
# 配置Chrome选项
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_argument('--disable-images') # 禁用图片加载以提高速度
chrome_options.add_argument('--disable-javascript') # 如果不需要JS可以禁用
chrome_options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
# 获取 chromedriver 路径(自动检测系统架构)
def get_chromedriver_path():
"""获取 chromedriver 路径,自动检测 32/64 位架构"""
import sys
import platform
# PyInstaller 打包后的临时目录
if getattr(sys, 'frozen', False):
base_path = sys._MEIPASS
else:
base_path = os.path.dirname(os.path.abspath(__file__))
# 检测系统架构
is_64bit = platform.machine().endswith('64')
driver_name = 'chromedriver64.exe' if is_64bit else 'chromedriver32.exe'
logging.info(f"检测到系统架构: {'64位' if is_64bit else '32位'},将使用 {driver_name}")
# 可能的驱动路径
possible_paths = [
os.path.join(base_path, 'drivers', driver_name),
os.path.join(base_path, 'drivers', 'chromedriver.exe'), # 备用
os.path.join(base_path, driver_name),
os.path.join(os.path.dirname(sys.executable), 'drivers', driver_name),
os.path.join(os.path.dirname(sys.executable), driver_name),
]
for path in possible_paths:
if os.path.exists(path):
logging.info(f"找到chromedriver: {path}")
return path
return None
# 优先使用本地驱动,如果找不到则报错
driver_path = get_chromedriver_path()
if driver_path:
service = Service(driver_path)
logging.info(f"使用本地chromedriver: {driver_path}")
else:
error_msg = "未找到chromedriver请运行 python setup_driver.py 下载驱动"
logging.error(error_msg)
raise FileNotFoundError(error_msg)
# 初始化WebDriver
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.set_page_load_timeout(timeout) # 设置页面加载超时
driver.implicitly_wait(10) # 设置隐式等待
yield driver
except Exception as e:
logging.error(f"WebDriver初始化失败: {e}")
raise
finally:
# 确保driver和service都被正确关闭
try:
if driver:
driver.quit()
except Exception as e:
logging.error(f"关闭WebDriver时出错: {e}")
try:
if service:
service.stop()
except AttributeError:
pass
except Exception as e:
logging.error(f"停止WebDriver服务时出错: {e}")
try:
with get_driver() as driver:
# 访问URL
driver.get(url)
# 等待页面加载完成
time.sleep(random.uniform(2, 4)) # 随机延迟
# 尝试等待文章内容加载
try:
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "article"))
)
except Exception as e:
logging.warning(f"等待文章元素超时: {e},将使用当前页面内容")
# 获取页面源代码
page_source = driver.page_source
return page_source
except Exception as e:
logging.error(f"使用Selenium获取网页源代码时出错: {e}")
return None
# =====================采集内容内容==================================
# def toutiao_w_extract_content(url):
# """
# 使用requests和BeautifulSoup提取头条页面内容
# """
# html_content = get_webpage_source_selenium(url)
#
# # 使用BeautifulSoup解析HTML
# soup = BeautifulSoup(html_content, 'html.parser')
#
# # 提取标题和文章内容
# article_element = soup.select_one('article')
#
# if not article_element:
# # 尝试其他可能的选择器
# article_element = soup.select_one('.article-content') or soup.select_one('.content')
#
# title_element = soup.select_one('h1') or soup.select_one('.article-title')
# title_text = title_element.get_text().strip() if title_element else ""
# article_text = article_element.get_text().strip() if article_element else ""
#
# # 提取图片URL
# img_elements = article_element.select('img') if article_element else []
# img_urls = [img.get('src') for img in img_elements if img.get('src')]
#
# return title_text, article_text, img_urls
def toutiao_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1'
article_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article'
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
# img_selector = "#root > div.article-detail-container > div.main > div.show-monitor article img"
# img_elements = soup.select(img_selector)
# # img_elements = article_element.select('img') if article_element else []
img_urls = extract_images_from_html(html_content)['image']
# img_urls = [img.get('src') for img in img_elements if img.get('src').startswith("https://p3")]
return title_text, article_text, img_urls
def wechat_extract_content(url):
"""
使用requests和BeautifulSoup提取微信公众号页面内容
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 使用指定的选择器提取标题和文章内容
title_element = soup.select_one('#activity-name')
article_element = soup.select_one('#js_content')
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取特定 section 中的图片 URL仅保留以 https://mmbiz.qpic.cn 开头的)
img_elements = article_element.select('img') if article_element else []
img_urls = []
for img in img_elements:
src = img.get('src') or img.get('data-src')
if src and src.startswith('https://mmbiz.qpic.cn'):
img_urls.append(src)
return title_text, article_text, img_urls
def wangyi_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#contain > div.post_main > h1'
article_selector = '#content > div.post_body'
# img_selector = "#content > div.post_body > p > img"
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
img_selector = "#content > div.post_body > p > img"
img_elements = soup.select(img_selector)
img_elements = article_element.select('img') if article_element else []
# img_urls = extract_images_from_html(html_content)['image']
img_urls = [img.get('src') for img in img_elements if img.get('src')]
return title_text, article_text, img_urls
def souhu_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#article-container > div.left.main > div:nth-child(1) > div > div.text-title > h1'
article_selector = '#mp-editor'
# img_selector = "#content > div.post_body > p > img"
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
# img_selector = "#mp-editor > p > img"
# img_elements = soup.select(img_selector)
img_elements = article_element.select('img') if article_element else []
img_urls = [img.get('src') for img in img_elements if img.get('src')]
return title_text, article_text, img_urls
def toutiao_w_extract_content(url):
"""
优化后的头条页面内容提取函数
专门获取文章内容中的图片链接
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 多种标题选择器,按优先级尝试
title_selectors = [
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1',
'h1.article-title',
'h1[data-testid="headline"]',
'.article-title h1',
'.article-header h1',
'article h1',
'h1'
]
title_text = ""
for selector in title_selectors:
title_element = soup.select_one(selector)
if title_element:
title_text = title_element.get_text().strip()
break
# 多种文章内容选择器,按优先级尝试
article_selectors = [
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article',
'article',
'.article-content',
'.content',
'#js_content',
'.post_body',
'[data-testid="article-content"]'
]
article_text = ""
article_element = None
for selector in article_selectors:
article_element = soup.select_one(selector)
if article_element:
article_text = article_element.get_text().strip()
break
# 只从文章内容中提取图片
img_urls = []
if article_element:
# 查找文章内容中的所有图片元素
img_elements = article_element.find_all('img')
for img in img_elements:
# 尝试多种可能的图片URL属性
for attr in ['src', 'data-src', 'data-original', 'data-lazy-src']:
url = img.get(attr)
if url:
# 处理相对路径
if url.startswith('//'):
url = 'https:' + url
elif url.startswith('/'):
url = 'https://www.toutiao.com' + url
# 只收集头条相关的图片URL
if any(domain in url for domain in ['toutiaoimg.com', 'p3-sign.toutiaoimg.com', 'byteimg.com']):
img_urls.append(url)
break # 找到一个有效URL就跳出内层循环
# 如果上面没有找到图片尝试使用现有的extract_images_from_html函数作为备选
if not img_urls:
extracted_imgs = extract_images_from_html(html_content)
if extracted_imgs and 'image' in extracted_imgs:
img_urls = extracted_imgs['image']
# 去重处理
img_urls = list(dict.fromkeys(img_urls))
return title_text, article_text, img_urls
def extract_content_with_retry(url, max_retries=3):
"""
通用的内容提取函数,带重试机制
Args:
url: 文章URL
max_retries: 最大重试次数
Returns:
tuple: (title, content, images)
"""
import time
import random
def _extract_by_domain(url):
"""根据域名选择合适的提取函数"""
if "toutiao.com" in url:
return toutiao_extract_content(url)
elif "mp.weixin.qq.com" in url:
return wechat_extract_content(url)
elif "www.163.com" in url or "163.com" in url:
return wangyi_extract_content(url)
elif "www.sohu.com" in url or "sohu.com" in url:
return souhu_extract_content(url)
else:
# 默认使用通用提取
return "", "", [] # 默认提取未实现
# 重试机制
for attempt in range(max_retries):
try:
result = _extract_by_domain(url)
title, content, images = result
# 验证提取结果
if title or content or images:
return title, content, images
else:
logging.warning(f"{attempt + 1} 次尝试提取结果为空URL: {url}")
except Exception as e:
logging.error(f"{attempt + 1} 次尝试失败: {e}")
# 等待后重试
if attempt < max_retries - 1:
delay = random.uniform(1, 3)
time.sleep(delay)
# 所有重试都失败
logging.error(f"提取内容失败,已重试 {max_retries} 次: {url}")
return "", "", []