ArticleReplace/get_web_content.py
2025-11-29 14:37:27 +08:00

500 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from bs4 import BeautifulSoup
import time
import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
import os
import logging
def extract_images_from_html(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
# 匹配所有以 https://p3-sign.toutiaoi 开头的图片链接
img_tags = soup.find_all('img')
img_urls = []
for img in img_tags:
for attr in ['src', 'data-src']:
url = img.get(attr)
if url and url.startswith("https://p3-sign.toutiaoimg.com/tos-cn-i"):
img_urls.append(url)
# 去重处理
img_urls = list(dict.fromkeys(img_urls))
# 返回 JSON 格式
return {"image": img_urls}
# ============================================================
def get_webpage_source(url):
"""
获取网页源代码的通用函数
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
try:
# 添加随机延迟,模拟人类行为
time.sleep(random.uniform(1, 3))
response = requests.get(url, headers=headers, timeout=10)
response.encoding = 'utf-8'
# 检查响应状态
if response.status_code == 200:
return response.text
else:
print(f"请求失败,状态码: {response.status_code}")
return None
except Exception as e:
print(f"获取网页源代码时出错: {e}")
return None
# def get_webpage_source_selenium(url):
# """
# 使用Selenium获取网页源代码适用于动态加载内容的网站
# """
# # 配置Chrome选项
# chrome_options = Options()
# chrome_options.add_argument('--headless') # 无头模式
# chrome_options.add_argument('--disable-gpu')
# chrome_options.add_argument('--no-sandbox')
# chrome_options.add_argument('--disable-dev-shm-usage')
# chrome_options.add_argument('--disable-blink-features=AutomationControlled')
# chrome_options.add_argument(
# 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
#
# # 初始化WebDriver
# driver = webdriver.Chrome(options=chrome_options)
#
# try:
# # 访问URL
# driver.get(url)
#
# # 等待页面加载完成(可根据实际情况调整等待条件)
# time.sleep(3) # 简单等待3秒
#
# # 尝试等待文章内容加载
# try:
# WebDriverWait(driver, 10).until(
# EC.presence_of_element_located((By.TAG_NAME, "article"))
# )
# except:
# print("等待文章元素超时,将使用当前页面内容")
#
# # 获取页面源代码
# page_source = driver.page_source
#
# # 保存源代码到文件
# with open("toutiao_source_selenium.html", "w", encoding="utf-8") as f:
# f.write(page_source)
#
# return page_source
# except Exception as e:
# print(f"使用Selenium获取网页源代码时出错: {e}")
# return None
# finally:
# # 关闭浏览器
# driver.quit()
# =====================采集内容内容==================================
# def toutiao_w_extract_content(url):
# """
# 使用requests和BeautifulSoup提取头条页面内容
# """
# html_content = get_webpage_source_selenium(url)
#
# # 使用BeautifulSoup解析HTML
# soup = BeautifulSoup(html_content, 'html.parser')
#
# # 提取标题和文章内容
# article_element = soup.select_one('article')
#
# if not article_element:
# # 尝试其他可能的选择器
# article_element = soup.select_one('.article-content') or soup.select_one('.content')
#
# title_element = soup.select_one('h1') or soup.select_one('.article-title')
# title_text = title_element.get_text().strip() if title_element else ""
# article_text = article_element.get_text().strip() if article_element else ""
#
# # 提取图片URL
# img_elements = article_element.select('img') if article_element else []
# img_urls = [img.get('src') for img in img_elements if img.get('src')]
#
# return title_text, article_text, img_urls
def toutiao_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1'
article_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article'
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
# img_selector = "#root > div.article-detail-container > div.main > div.show-monitor article img"
# img_elements = soup.select(img_selector)
# # img_elements = article_element.select('img') if article_element else []
img_urls = extract_images_from_html(html_content)['image']
# img_urls = [img.get('src') for img in img_elements if img.get('src').startswith("https://p3")]
return title_text, article_text, img_urls
def wechat_extract_content(url):
"""
使用requests和BeautifulSoup提取微信公众号页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 使用指定的选择器提取标题和文章内容
title_element = soup.select_one('#activity-name')
article_element = soup.select_one('#js_content')
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取特定 section 中的图片 URL仅保留以 https://mmbiz.qpic.cn 开头的)
img_elements = article_element.select('img') if article_element else []
img_urls = []
for img in img_elements:
src = img.get('src') or img.get('data-src')
if src and src.startswith('https://mmbiz.qpic.cn'):
img_urls.append(src)
return title_text, article_text, img_urls
def wangyi_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取标题和文章内容
title_selector = '#contain > div.post_main > h1'
article_selector = '#content > div.post_body'
# img_selector = "#content > div.post_body > p > img"
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
img_selector = "#content > div.post_body > p > img"
img_elements = soup.select(img_selector)
img_elements = article_element.select('img') if article_element else []
# img_urls = extract_images_from_html(html_content)['image']
img_urls = [img.get('src') for img in img_elements if img.get('src')]
return title_text, article_text, img_urls
def souhu_extract_content(url):
"""
使用requests和BeautifulSoup提取头条页面内容
"""
html_content = get_webpage_source_selenium(url)
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
print(soup)
# 提取标题和文章内容
title_selector = '#article-container > div.left.main > div:nth-child(1) > div > div.text-title > h1'
article_selector = '#mp-editor'
# img_selector = "#content > div.post_body > p > img"
title_element = soup.select_one(title_selector)
article_element = soup.select_one(article_selector)
title_text = title_element.get_text().strip() if title_element else ""
article_text = article_element.get_text().strip() if article_element else ""
# 提取图片URL
# img_selector = "#mp-editor > p > img"
# img_elements = soup.select(img_selector)
img_elements = article_element.select('img') if article_element else []
img_urls = [img.get('src') for img in img_elements if img.get('src')]
return title_text, article_text, img_urls
def toutiao_w_extract_content(url):
"""
优化后的头条页面内容提取函数
专门获取文章内容中的图片链接
"""
html_content = get_webpage_source_selenium(url)
if not html_content:
print("获取HTML内容失败")
return "", "", []
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 多种标题选择器,按优先级尝试
title_selectors = [
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1',
'h1.article-title',
'h1[data-testid="headline"]',
'.article-title h1',
'.article-header h1',
'article h1',
'h1'
]
title_text = ""
for selector in title_selectors:
title_element = soup.select_one(selector)
if title_element:
title_text = title_element.get_text().strip()
break
# 多种文章内容选择器,按优先级尝试
article_selectors = [
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article',
'article',
'.article-content',
'.content',
'#js_content',
'.post_body',
'[data-testid="article-content"]'
]
article_text = ""
article_element = None
for selector in article_selectors:
article_element = soup.select_one(selector)
if article_element:
article_text = article_element.get_text().strip()
break
# 只从文章内容中提取图片
img_urls = []
if article_element:
# 查找文章内容中的所有图片元素
img_elements = article_element.find_all('img')
for img in img_elements:
# 尝试多种可能的图片URL属性
for attr in ['src', 'data-src', 'data-original', 'data-lazy-src']:
url = img.get(attr)
if url:
# 处理相对路径
if url.startswith('//'):
url = 'https:' + url
elif url.startswith('/'):
url = 'https://www.toutiao.com' + url
# 只收集头条相关的图片URL
if any(domain in url for domain in ['toutiaoimg.com', 'p3-sign.toutiaoimg.com', 'byteimg.com']):
img_urls.append(url)
break # 找到一个有效URL就跳出内层循环
# 如果上面没有找到图片尝试使用现有的extract_images_from_html函数作为备选
if not img_urls:
extracted_imgs = extract_images_from_html(html_content)
if extracted_imgs and 'image' in extracted_imgs:
img_urls = extracted_imgs['image']
# 去重处理
img_urls = list(dict.fromkeys(img_urls))
return title_text, article_text, img_urls
def get_webpage_source_selenium(url):
"""
增强版的Selenium获取网页源代码函数
专门针对头条网站的动态加载特性进行优化
使用webdriver-manager自动管理ChromeDriver并将驱动保存到项目目录
"""
import platform
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_argument('--disable-images') # 禁用图片加载以提高速度
chrome_options.add_argument('--disable-javascript') # 如果不需要JS可以禁用
chrome_options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
# 使用webdriver-manager自动管理ChromeDriver并保存到项目drivers目录
try:
# 设置drivers目录路径
project_root = os.getcwd()
drivers_path = os.path.join(project_root, "drivers")
# 确保drivers目录存在
os.makedirs(drivers_path, exist_ok=True)
# 检查drivers目录下是否已有兼容的ChromeDriver
chromedriver_path = None
if os.path.exists(drivers_path):
for root, dirs, files in os.walk(drivers_path):
for file in files:
# 检查文件名是否包含ChromeDriver并且与当前系统架构匹配
if file.lower().startswith("chromedriver"):
candidate_path = os.path.join(root, file)
# 检查文件是否可执行
if os.access(candidate_path, os.X_OK) or file.lower().endswith('.exe'):
# 对于Windows系统确保是.exe文件
if platform.system() == 'Windows' and not file.lower().endswith('.exe'):
candidate_path += '.exe'
# 检查文件是否存在且可执行
if os.path.exists(candidate_path):
chromedriver_path = candidate_path
break
if chromedriver_path:
break
# 如果drivers目录下没有兼容的ChromeDriver则使用webdriver-manager下载并保存
if not chromedriver_path:
try:
# 使用webdriver-manager下载适合当前系统的ChromeDriver到drivers目录
from webdriver_manager.core.driver_cache import DriverCacheManager
# 创建自定义缓存管理器,指定保存路径
cache_manager = DriverCacheManager(root_dir=drivers_path)
# 获取ChromeDriver路径
chromedriver_path = ChromeDriverManager(cache_manager=cache_manager).install()
logging.info(f"ChromeDriver已下载到: {chromedriver_path}")
except Exception as download_error:
logging.warning(f"使用webdriver-manager下载ChromeDriver失败: {download_error}")
# 尝试使用系统PATH中的ChromeDriver
chromedriver_path = None
# 使用找到的ChromeDriver路径
if chromedriver_path and os.path.exists(chromedriver_path):
try:
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
except Exception as service_error:
logging.warning(f"使用本地ChromeDriver失败: {service_error}")
# 尝试使用webdriver-manager重新下载
try:
chromedriver_path = ChromeDriverManager().install()
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
except Exception as fallback_error:
logging.warning(f"使用webdriver-manager重新下载也失败: {fallback_error}")
# 最后的备选方案
driver = webdriver.Chrome(options=chrome_options)
else:
# 如果所有方法都失败了尝试使用系统PATH中的ChromeDriver
logging.warning("无法找到兼容的ChromeDriver尝试使用系统PATH中的ChromeDriver")
driver = webdriver.Chrome(options=chrome_options)
except Exception as e:
# 最后的备选方案
logging.warning(f"ChromeDriver初始化失败使用默认方式: {e}")
driver = webdriver.Chrome(options=chrome_options)
try:
driver.get(url)
# 等待页面加载完成
time.sleep(5)
# 尝试等待关键元素加载
wait = WebDriverWait(driver, 15)
try:
# 等待文章标题加载
wait.until(EC.presence_of_element_located((By.TAG_NAME, "h1")))
# 等待文章内容加载
wait.until(EC.presence_of_element_located((By.TAG_NAME, "article")))
except:
print("等待关键元素超时,使用当前页面内容")
# 滚动页面以触发懒加载
driver.execute_script("window.scrollTo(0, document.body.scrollHeight/2);")
time.sleep(2)
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
driver.execute_script("window.scrollTo(0, 0);")
time.sleep(1)
page_source = driver.page_source
# # 保存源代码用于调试
# with open("toutiao_source_enhanced.html", "w", encoding="utf-8") as f:
# f.write(page_source)
return page_source
except Exception as e:
print(f"使用增强版Selenium获取网页源代码时出错: {e}")
return None
finally:
try:
driver.quit()
except:
pass