Files
ArticleReplaceBatch/get_web_content.py

512 lines
17 KiB
Python
Raw Permalink Normal View History

2026-03-25 15:17:18 +08:00
from bs4 import BeautifulSoup
import time
import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
import os
import logging
def extract_images_from_html(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
# 匹配所有以 https://p3-sign.toutiaoi 开头的图片链接
img_tags = soup.find_all('img')
img_urls = []
for img in img_tags:
for attr in ['src', 'data-src']:
url = img.get(attr)
if url and url.startswith("https://p3-sign.toutiaoimg.com/tos-cn-i"):
img_urls.append(url)
# 去重处理
img_urls = list(dict.fromkeys(img_urls))
# 返回 JSON 格式
return {"image": img_urls}
# ============================================================
def get_webpage_source(url):
"""
获取网页源代码的通用函数
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
proxies = {
'http': None,
'https': None,
}
try:
# 添加随机延迟,模拟人类行为
time.sleep(random.uniform(1, 3))
response = requests.get(url, headers=headers, proxies=proxies, timeout=10)
response.encoding = 'utf-8'
# 检查响应状态
if response.status_code == 200:
return response.text
else:
print(f"请求失败,状态码: {response.status_code}")
return None
except Exception as e:
print(f"获取网页源代码时出错: {e}")
return None
def get_webpage_source_selenium(url, timeout=30):
"""
使用Selenium获取网页源代码适用于动态加载内容的网站
修复了资源泄漏和异常处理问题
"""
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from contextlib import contextmanager
@contextmanager
def get_driver():
"""使用上下文管理器确保WebDriver正确关闭"""
driver = None
service = None
try:
# 配置Chrome选项
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_argument('--disable-images') # 禁用图片加载以提高速度
chrome_options.add_argument('--disable-javascript') # 如果不需要JS可以禁用
chrome_options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
# 获取 chromedriver 路径(自动检测系统架构)
def get_chromedriver_path():
"""获取 chromedriver 路径,自动检测 32/64 位架构"""
import sys
import platform
# PyInstaller 打包后的临时目录
if getattr(sys, 'frozen', False):
base_path = sys._MEIPASS
else:
base_path = os.path.dirname(os.path.abspath(__file__))
# 检测系统架构
is_64bit = platform.machine().endswith('64')
driver_name = 'chromedriver64.exe' if is_64bit else 'chromedriver32.exe'
logging.info(f"检测到系统架构: {'64位' if is_64bit else '32位'},将使用 {driver_name}")
# 可能的驱动路径
possible_paths = [
os.path.join(base_path, 'drivers', driver_name),
os.path.join(base_path, 'drivers', 'chromedriver.exe'), # 备用
os.path.join(base_path, driver_name),
os.path.join(os.path.dirname(sys.executable), 'drivers', driver_name),
os.path.join(os.path.dirname(sys.executable), driver_name),
]
for path in possible_paths:
if os.path.exists(path):
logging.info(f"找到chromedriver: {path}")
return path
return None
# 优先使用本地驱动,如果找不到则报错
driver_path = get_chromedriver_path()
if driver_path:
service = Service(driver_path)
logging.info(f"使用本地chromedriver: {driver_path}")
else:
error_msg = "未找到chromedriver请运行 python setup_driver.py 下载驱动"
logging.error(error_msg)
raise FileNotFoundError(error_msg)
# 初始化WebDriver
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.set_page_load_timeout(timeout) # 设置页面加载超时
driver.implicitly_wait(10) # 设置隐式等待
yield driver
except Exception as e:
logging.error(f"WebDriver初始化失败: {e}")
raise
finally:
# 确保driver和service都被正确关闭
try:
if driver:
driver.quit()
except Exception as e:
logging.error(f"关闭WebDriver时出错: {e}")
try:
if service:
service.stop()
except AttributeError:
pass
except Exception as e:
logging.error(f"停止WebDriver服务时出错: {e}")
try:
with get_driver() as driver:
# 访问URL
driver.get(url)
# 等待页面加载完成
time.sleep(random.uniform(2, 4)) # 随机延迟
# 尝试等待文章内容加载
try:
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "article"))
)
except Exception as e:
logging.warning(f"等待文章元素超时: {e},将使用当前页面内容")
# 获取页面源代码
page_source = driver.page_source
return page_source
except Exception as e:
logging.error(f"使用Selenium获取网页源代码时出错: {e}")
return None
# =====================采集内容内容==================================
# def toutiao_w_extract_content(url):
# """
# 使用requests和BeautifulSoup提取头条页面内容
# """
# html_content = get_webpage_source_selenium(url)
#
# # 使用BeautifulSoup解析HTML
# soup = BeautifulSoup(html_content, 'html.parser')
#
# # 提取标题和文章内容
# article_element = soup.select_one('article')
#
# if not article_element:
# # 尝试其他可能的选择器
# article_element = soup.select_one('.article-content') or soup.select_one('.content')
#
# title_element = soup.select_one('h1') or soup.select_one('.article-title')
# title_text = title_element.get_text().strip() if title_element else ""
# article_text = article_element.get_text().strip() if article_element else ""
#
# # 提取图片URL
# img_elements = article_element.select('img') if article_element else []
# img_urls = [img.get('src') for img in img_elements if img.get('src')]
#
# return title_text, article_text, img_urls
def toutiao_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1'
article_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article'
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
# img_selector = "#root > div.article-detail-container > div.main > div.show-monitor article img"
# img_elements = soup.select(img_selector)
# # img_elements = article_element.select('img') if article_element else []
img_urls = extract_images_from_html(html_content)['image']
# img_urls = [img.get('src') for img in img_elements if img.get('src').startswith("https://p3")]
return title_text, article_text, img_urls
def wechat_extract_content(url):
"""
使用requests和BeautifulSoup提取微信公众号页面内容
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 使用指定的选择器提取标题和文章内容
title_element = soup.select_one('#activity-name')
article_element = soup.select_one('#js_content')
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取特定 section 中的图片 URL仅保留以 https://mmbiz.qpic.cn 开头的)
img_elements = article_element.select('img') if article_element else []
img_urls = []
for img in img_elements:
src = img.get('src') or img.get('data-src')
if src and src.startswith('https://mmbiz.qpic.cn'):
img_urls.append(src)
return title_text, article_text, img_urls
def wangyi_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#contain > div.post_main > h1'
article_selector = '#content > div.post_body'
# img_selector = "#content > div.post_body > p > img"
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
img_selector = "#content > div.post_body > p > img"
img_elements = soup.select(img_selector)
img_elements = article_element.select('img') if article_element else []
# img_urls = extract_images_from_html(html_content)['image']
img_urls = [img.get('src') for img in img_elements if img.get('src')]
return title_text, article_text, img_urls
def souhu_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#article-container > div.left.main > div:nth-child(1) > div > div.text-title > h1'
article_selector = '#mp-editor'
# img_selector = "#content > div.post_body > p > img"
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
# img_selector = "#mp-editor > p > img"
# img_elements = soup.select(img_selector)
img_elements = article_element.select('img') if article_element else []
img_urls = [img.get('src') for img in img_elements if img.get('src')]
return title_text, article_text, img_urls
def toutiao_w_extract_content(url):
"""
优化后的头条页面内容提取函数
专门获取文章内容中的图片链接
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 多种标题选择器,按优先级尝试
title_selectors = [
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1',
'h1.article-title',
'h1[data-testid="headline"]',
'.article-title h1',
'.article-header h1',
'article h1',
'h1'
]
title_text = ""
for selector in title_selectors:
title_element = soup.select_one(selector)
if title_element:
title_text = title_element.get_text().strip()
break
# 多种文章内容选择器,按优先级尝试
article_selectors = [
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article',
'article',
'.article-content',
'.content',
'#js_content',
'.post_body',
'[data-testid="article-content"]'
]
article_text = ""
article_element = None
for selector in article_selectors:
article_element = soup.select_one(selector)
if article_element:
article_text = article_element.get_text().strip()
break
# 只从文章内容中提取图片
img_urls = []
if article_element:
# 查找文章内容中的所有图片元素
img_elements = article_element.find_all('img')
for img in img_elements:
# 尝试多种可能的图片URL属性
for attr in ['src', 'data-src', 'data-original', 'data-lazy-src']:
url = img.get(attr)
if url:
# 处理相对路径
if url.startswith('//'):
url = 'https:' + url
elif url.startswith('/'):
url = 'https://www.toutiao.com' + url
# 只收集头条相关的图片URL
if any(domain in url for domain in ['toutiaoimg.com', 'p3-sign.toutiaoimg.com', 'byteimg.com']):
img_urls.append(url)
break # 找到一个有效URL就跳出内层循环
# 如果上面没有找到图片尝试使用现有的extract_images_from_html函数作为备选
if not img_urls:
extracted_imgs = extract_images_from_html(html_content)
if extracted_imgs and 'image' in extracted_imgs:
img_urls = extracted_imgs['image']
# 去重处理
img_urls = list(dict.fromkeys(img_urls))
return title_text, article_text, img_urls
def extract_content_with_retry(url, max_retries=3):
"""
通用的内容提取函数带重试机制
Args:
url: 文章URL
max_retries: 最大重试次数
Returns:
tuple: (title, content, images)
"""
import time
import random
def _extract_by_domain(url):
"""根据域名选择合适的提取函数"""
if "toutiao.com" in url:
return toutiao_extract_content(url)
elif "mp.weixin.qq.com" in url:
return wechat_extract_content(url)
elif "www.163.com" in url or "163.com" in url:
return wangyi_extract_content(url)
elif "www.sohu.com" in url or "sohu.com" in url:
return souhu_extract_content(url)
else:
# 默认使用通用提取
return "", "", [] # 默认提取未实现
# 重试机制
for attempt in range(max_retries):
try:
result = _extract_by_domain(url)
title, content, images = result
# 验证提取结果
if title or content or images:
return title, content, images
else:
logging.warning(f"{attempt + 1} 次尝试提取结果为空URL: {url}")
except Exception as e:
logging.error(f"{attempt + 1} 次尝试失败: {e}")
# 等待后重试
if attempt < max_retries - 1:
delay = random.uniform(1, 3)
time.sleep(delay)
# 所有重试都失败
logging.error(f"提取内容失败,已重试 {max_retries} 次: {url}")
return "", "", []