ArticleReplace/get_web_content.py

500 lines
18 KiB
Python
Raw Normal View History

2025-10-25 16:45:02 +08:00
from bs4 import BeautifulSoup
import time
import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests
2025-11-29 14:37:27 +08:00
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
import os
import logging
2025-10-25 16:45:02 +08:00
def extract_images_from_html(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
# 匹配所有以 https://p3-sign.toutiaoi 开头的图片链接
img_tags = soup.find_all('img')
img_urls = []
for img in img_tags:
for attr in ['src', 'data-src']:
url = img.get(attr)
if url and url.startswith("https://p3-sign.toutiaoimg.com/tos-cn-i"):
img_urls.append(url)
# 去重处理
img_urls = list(dict.fromkeys(img_urls))
# 返回 JSON 格式
return {"image": img_urls}
# ============================================================
def get_webpage_source(url):
"""
获取网页源代码的通用函数
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
try:
# 添加随机延迟,模拟人类行为
time.sleep(random.uniform(1, 3))
response = requests.get(url, headers=headers, timeout=10)
response.encoding = 'utf-8'
# 检查响应状态
if response.status_code == 200:
return response.text
else:
print(f"请求失败,状态码: {response.status_code}")
return None
except Exception as e:
print(f"获取网页源代码时出错: {e}")
return None
# def get_webpage_source_selenium(url):
# """
# 使用Selenium获取网页源代码适用于动态加载内容的网站
# """
# # 配置Chrome选项
# chrome_options = Options()
# chrome_options.add_argument('--headless') # 无头模式
# chrome_options.add_argument('--disable-gpu')
# chrome_options.add_argument('--no-sandbox')
# chrome_options.add_argument('--disable-dev-shm-usage')
# chrome_options.add_argument('--disable-blink-features=AutomationControlled')
# chrome_options.add_argument(
# 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
#
# # 初始化WebDriver
# driver = webdriver.Chrome(options=chrome_options)
#
# try:
# # 访问URL
# driver.get(url)
#
# # 等待页面加载完成(可根据实际情况调整等待条件)
# time.sleep(3) # 简单等待3秒
#
# # 尝试等待文章内容加载
# try:
# WebDriverWait(driver, 10).until(
# EC.presence_of_element_located((By.TAG_NAME, "article"))
# )
# except:
# print("等待文章元素超时,将使用当前页面内容")
#
# # 获取页面源代码
# page_source = driver.page_source
#
# # 保存源代码到文件
# with open("toutiao_source_selenium.html", "w", encoding="utf-8") as f:
# f.write(page_source)
#
# return page_source
# except Exception as e:
# print(f"使用Selenium获取网页源代码时出错: {e}")
# return None
# finally:
# # 关闭浏览器
# driver.quit()
# =====================采集内容内容==================================
# def toutiao_w_extract_content(url):
# """
# 使用requests和BeautifulSoup提取头条页面内容
# """
# html_content = get_webpage_source_selenium(url)
#
# # 使用BeautifulSoup解析HTML
# soup = BeautifulSoup(html_content, 'html.parser')
#
# # 提取标题和文章内容
# article_element = soup.select_one('article')
#
# if not article_element:
# # 尝试其他可能的选择器
# article_element = soup.select_one('.article-content') or soup.select_one('.content')
#
# title_element = soup.select_one('h1') or soup.select_one('.article-title')
# title_text = title_element.get_text().strip() if title_element else ""
# article_text = article_element.get_text().strip() if article_element else ""
#
# # 提取图片URL
# img_elements = article_element.select('img') if article_element else []
# img_urls = [img.get('src') for img in img_elements if img.get('src')]
#
# return title_text, article_text, img_urls
def toutiao_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1'
article_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article'
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
# img_selector = "#root > div.article-detail-container > div.main > div.show-monitor article img"
# img_elements = soup.select(img_selector)
# # img_elements = article_element.select('img') if article_element else []
img_urls = extract_images_from_html(html_content)['image']
# img_urls = [img.get('src') for img in img_elements if img.get('src').startswith("https://p3")]
return title_text, article_text, img_urls
def wechat_extract_content(url):
"""
使用requests和BeautifulSoup提取微信公众号页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 使用指定的选择器提取标题和文章内容
title_element = soup.select_one('#activity-name')
article_element = soup.select_one('#js_content')
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取特定 section 中的图片 URL仅保留以 https://mmbiz.qpic.cn 开头的)
img_elements = article_element.select('img') if article_element else []
img_urls = []
for img in img_elements:
src = img.get('src') or img.get('data-src')
if src and src.startswith('https://mmbiz.qpic.cn'):
img_urls.append(src)
return title_text, article_text, img_urls
def wangyi_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#contain > div.post_main > h1'
article_selector = '#content > div.post_body'
# img_selector = "#content > div.post_body > p > img"
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
img_selector = "#content > div.post_body > p > img"
img_elements = soup.select(img_selector)
img_elements = article_element.select('img') if article_element else []
# img_urls = extract_images_from_html(html_content)['image']
img_urls = [img.get('src') for img in img_elements if img.get('src')]
return title_text, article_text, img_urls
def souhu_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
print(soup)
# 提取标题和文章内容
title_selector = '#article-container > div.left.main > div:nth-child(1) > div > div.text-title > h1'
article_selector = '#mp-editor'
# img_selector = "#content > div.post_body > p > img"
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
# img_selector = "#mp-editor > p > img"
# img_elements = soup.select(img_selector)
img_elements = article_element.select('img') if article_element else []
img_urls = [img.get('src') for img in img_elements if img.get('src')]
return title_text, article_text, img_urls
def toutiao_w_extract_content(url):
"""
优化后的头条页面内容提取函数
专门获取文章内容中的图片链接
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 多种标题选择器,按优先级尝试
title_selectors = [
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1',
'h1.article-title',
'h1[data-testid="headline"]',
'.article-title h1',
'.article-header h1',
'article h1',
'h1'
]
title_text = ""
for selector in title_selectors:
title_element = soup.select_one(selector)
if title_element:
title_text = title_element.get_text().strip()
break
# 多种文章内容选择器,按优先级尝试
article_selectors = [
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article',
'article',
'.article-content',
'.content',
'#js_content',
'.post_body',
'[data-testid="article-content"]'
]
article_text = ""
article_element = None
for selector in article_selectors:
article_element = soup.select_one(selector)
if article_element:
article_text = article_element.get_text().strip()
break
# 只从文章内容中提取图片
img_urls = []
if article_element:
# 查找文章内容中的所有图片元素
img_elements = article_element.find_all('img')
for img in img_elements:
# 尝试多种可能的图片URL属性
for attr in ['src', 'data-src', 'data-original', 'data-lazy-src']:
url = img.get(attr)
if url:
# 处理相对路径
if url.startswith('//'):
url = 'https:' + url
elif url.startswith('/'):
url = 'https://www.toutiao.com' + url
# 只收集头条相关的图片URL
if any(domain in url for domain in ['toutiaoimg.com', 'p3-sign.toutiaoimg.com', 'byteimg.com']):
img_urls.append(url)
break # 找到一个有效URL就跳出内层循环
# 如果上面没有找到图片尝试使用现有的extract_images_from_html函数作为备选
if not img_urls:
extracted_imgs = extract_images_from_html(html_content)
if extracted_imgs and 'image' in extracted_imgs:
img_urls = extracted_imgs['image']
# 去重处理
img_urls = list(dict.fromkeys(img_urls))
return title_text, article_text, img_urls
def get_webpage_source_selenium(url):
"""
增强版的Selenium获取网页源代码函数
专门针对头条网站的动态加载特性进行优化
2025-11-29 14:37:27 +08:00
使用webdriver-manager自动管理ChromeDriver并将驱动保存到项目目录
2025-10-25 16:45:02 +08:00
"""
2025-11-29 14:37:27 +08:00
import platform
2025-10-25 16:45:02 +08:00
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_argument('--disable-images') # 禁用图片加载以提高速度
chrome_options.add_argument('--disable-javascript') # 如果不需要JS可以禁用
chrome_options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
2025-11-29 14:37:27 +08:00
# 使用webdriver-manager自动管理ChromeDriver并保存到项目drivers目录
try:
# 设置drivers目录路径
project_root = os.getcwd()
drivers_path = os.path.join(project_root, "drivers")
# 确保drivers目录存在
os.makedirs(drivers_path, exist_ok=True)
# 检查drivers目录下是否已有兼容的ChromeDriver
chromedriver_path = None
if os.path.exists(drivers_path):
for root, dirs, files in os.walk(drivers_path):
for file in files:
# 检查文件名是否包含ChromeDriver并且与当前系统架构匹配
if file.lower().startswith("chromedriver"):
candidate_path = os.path.join(root, file)
# 检查文件是否可执行
if os.access(candidate_path, os.X_OK) or file.lower().endswith('.exe'):
# 对于Windows系统确保是.exe文件
if platform.system() == 'Windows' and not file.lower().endswith('.exe'):
candidate_path += '.exe'
# 检查文件是否存在且可执行
if os.path.exists(candidate_path):
chromedriver_path = candidate_path
break
if chromedriver_path:
break
# 如果drivers目录下没有兼容的ChromeDriver则使用webdriver-manager下载并保存
if not chromedriver_path:
try:
# 使用webdriver-manager下载适合当前系统的ChromeDriver到drivers目录
from webdriver_manager.core.driver_cache import DriverCacheManager
# 创建自定义缓存管理器,指定保存路径
cache_manager = DriverCacheManager(root_dir=drivers_path)
# 获取ChromeDriver路径
chromedriver_path = ChromeDriverManager(cache_manager=cache_manager).install()
logging.info(f"ChromeDriver已下载到: {chromedriver_path}")
except Exception as download_error:
logging.warning(f"使用webdriver-manager下载ChromeDriver失败: {download_error}")
# 尝试使用系统PATH中的ChromeDriver
chromedriver_path = None
# 使用找到的ChromeDriver路径
if chromedriver_path and os.path.exists(chromedriver_path):
try:
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
except Exception as service_error:
logging.warning(f"使用本地ChromeDriver失败: {service_error}")
# 尝试使用webdriver-manager重新下载
try:
chromedriver_path = ChromeDriverManager().install()
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
except Exception as fallback_error:
logging.warning(f"使用webdriver-manager重新下载也失败: {fallback_error}")
# 最后的备选方案
driver = webdriver.Chrome(options=chrome_options)
else:
# 如果所有方法都失败了尝试使用系统PATH中的ChromeDriver
logging.warning("无法找到兼容的ChromeDriver尝试使用系统PATH中的ChromeDriver")
driver = webdriver.Chrome(options=chrome_options)
except Exception as e:
# 最后的备选方案
logging.warning(f"ChromeDriver初始化失败使用默认方式: {e}")
driver = webdriver.Chrome(options=chrome_options)
2025-10-25 16:45:02 +08:00
try:
driver.get(url)
# 等待页面加载完成
time.sleep(5)
# 尝试等待关键元素加载
wait = WebDriverWait(driver, 15)
try:
# 等待文章标题加载
wait.until(EC.presence_of_element_located((By.TAG_NAME, "h1")))
# 等待文章内容加载
wait.until(EC.presence_of_element_located((By.TAG_NAME, "article")))
except:
print("等待关键元素超时,使用当前页面内容")
# 滚动页面以触发懒加载
driver.execute_script("window.scrollTo(0, document.body.scrollHeight/2);")
time.sleep(2)
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
driver.execute_script("window.scrollTo(0, 0);")
time.sleep(1)
page_source = driver.page_source
# # 保存源代码用于调试
# with open("toutiao_source_enhanced.html", "w", encoding="utf-8") as f:
# f.write(page_source)
return page_source
except Exception as e:
print(f"使用增强版Selenium获取网页源代码时出错: {e}")
return None
finally:
2025-11-29 14:37:27 +08:00
try:
driver.quit()
except:
pass
2025-10-25 16:45:02 +08:00