500 lines
18 KiB
Python
500 lines
18 KiB
Python
from bs4 import BeautifulSoup
|
||
import time
|
||
import random
|
||
from selenium import webdriver
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
import requests
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
from selenium.webdriver.chrome.service import Service
|
||
import os
|
||
import logging
|
||
|
||
def extract_images_from_html(html_content):
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
||
# 匹配所有以 https://p3-sign.toutiaoi 开头的图片链接
|
||
img_tags = soup.find_all('img')
|
||
img_urls = []
|
||
|
||
for img in img_tags:
|
||
for attr in ['src', 'data-src']:
|
||
url = img.get(attr)
|
||
if url and url.startswith("https://p3-sign.toutiaoimg.com/tos-cn-i"):
|
||
img_urls.append(url)
|
||
|
||
# 去重处理
|
||
img_urls = list(dict.fromkeys(img_urls))
|
||
|
||
# 返回 JSON 格式
|
||
return {"image": img_urls}
|
||
|
||
|
||
|
||
# ============================================================
|
||
def get_webpage_source(url):
|
||
"""
|
||
获取网页源代码的通用函数
|
||
"""
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||
'Connection': 'keep-alive',
|
||
'Upgrade-Insecure-Requests': '1'
|
||
}
|
||
|
||
try:
|
||
# 添加随机延迟,模拟人类行为
|
||
time.sleep(random.uniform(1, 3))
|
||
response = requests.get(url, headers=headers, timeout=10)
|
||
response.encoding = 'utf-8'
|
||
|
||
# 检查响应状态
|
||
if response.status_code == 200:
|
||
return response.text
|
||
else:
|
||
print(f"请求失败,状态码: {response.status_code}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"获取网页源代码时出错: {e}")
|
||
return None
|
||
|
||
|
||
# def get_webpage_source_selenium(url):
|
||
# """
|
||
# 使用Selenium获取网页源代码,适用于动态加载内容的网站
|
||
# """
|
||
# # 配置Chrome选项
|
||
# chrome_options = Options()
|
||
# chrome_options.add_argument('--headless') # 无头模式
|
||
# chrome_options.add_argument('--disable-gpu')
|
||
# chrome_options.add_argument('--no-sandbox')
|
||
# chrome_options.add_argument('--disable-dev-shm-usage')
|
||
# chrome_options.add_argument('--disable-blink-features=AutomationControlled')
|
||
# chrome_options.add_argument(
|
||
# 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
|
||
#
|
||
# # 初始化WebDriver
|
||
# driver = webdriver.Chrome(options=chrome_options)
|
||
#
|
||
# try:
|
||
# # 访问URL
|
||
# driver.get(url)
|
||
#
|
||
# # 等待页面加载完成(可根据实际情况调整等待条件)
|
||
# time.sleep(3) # 简单等待3秒
|
||
#
|
||
# # 尝试等待文章内容加载
|
||
# try:
|
||
# WebDriverWait(driver, 10).until(
|
||
# EC.presence_of_element_located((By.TAG_NAME, "article"))
|
||
# )
|
||
# except:
|
||
# print("等待文章元素超时,将使用当前页面内容")
|
||
#
|
||
# # 获取页面源代码
|
||
# page_source = driver.page_source
|
||
#
|
||
# # 保存源代码到文件
|
||
# with open("toutiao_source_selenium.html", "w", encoding="utf-8") as f:
|
||
# f.write(page_source)
|
||
#
|
||
# return page_source
|
||
# except Exception as e:
|
||
# print(f"使用Selenium获取网页源代码时出错: {e}")
|
||
# return None
|
||
# finally:
|
||
# # 关闭浏览器
|
||
# driver.quit()
|
||
|
||
|
||
# =====================采集内容内容==================================
|
||
# def toutiao_w_extract_content(url):
|
||
# """
|
||
# 使用requests和BeautifulSoup提取头条页面内容
|
||
# """
|
||
# html_content = get_webpage_source_selenium(url)
|
||
#
|
||
# # 使用BeautifulSoup解析HTML
|
||
# soup = BeautifulSoup(html_content, 'html.parser')
|
||
#
|
||
# # 提取标题和文章内容
|
||
# article_element = soup.select_one('article')
|
||
#
|
||
# if not article_element:
|
||
# # 尝试其他可能的选择器
|
||
# article_element = soup.select_one('.article-content') or soup.select_one('.content')
|
||
#
|
||
# title_element = soup.select_one('h1') or soup.select_one('.article-title')
|
||
# title_text = title_element.get_text().strip() if title_element else ""
|
||
# article_text = article_element.get_text().strip() if article_element else ""
|
||
#
|
||
# # 提取图片URL
|
||
# img_elements = article_element.select('img') if article_element else []
|
||
# img_urls = [img.get('src') for img in img_elements if img.get('src')]
|
||
#
|
||
# return title_text, article_text, img_urls
|
||
|
||
|
||
def toutiao_extract_content(url):
|
||
|
||
"""
|
||
使用requests和BeautifulSoup提取头条页面内容
|
||
"""
|
||
html_content = get_webpage_source_selenium(url)
|
||
|
||
# 使用BeautifulSoup解析HTML
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
||
|
||
|
||
# 提取标题和文章内容
|
||
title_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1'
|
||
article_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article'
|
||
|
||
title_element = soup.select_one(title_selector)
|
||
article_element = soup.select_one(article_selector)
|
||
|
||
title_text = title_element.get_text().strip() if title_element else ""
|
||
article_text = article_element.get_text().strip() if article_element else ""
|
||
|
||
# 提取图片URL
|
||
# img_selector = "#root > div.article-detail-container > div.main > div.show-monitor article img"
|
||
# img_elements = soup.select(img_selector)
|
||
# # img_elements = article_element.select('img') if article_element else []
|
||
|
||
img_urls = extract_images_from_html(html_content)['image']
|
||
# img_urls = [img.get('src') for img in img_elements if img.get('src').startswith("https://p3")]
|
||
|
||
|
||
|
||
|
||
return title_text, article_text, img_urls
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
def wechat_extract_content(url):
|
||
"""
|
||
使用requests和BeautifulSoup提取微信公众号页面内容
|
||
"""
|
||
html_content = get_webpage_source_selenium(url)
|
||
|
||
# 使用BeautifulSoup解析HTML
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
||
# 使用指定的选择器提取标题和文章内容
|
||
title_element = soup.select_one('#activity-name')
|
||
article_element = soup.select_one('#js_content')
|
||
|
||
title_text = title_element.get_text().strip() if title_element else ""
|
||
article_text = article_element.get_text().strip() if article_element else ""
|
||
|
||
# 提取特定 section 中的图片 URL(仅保留以 https://mmbiz.qpic.cn 开头的)
|
||
img_elements = article_element.select('img') if article_element else []
|
||
img_urls = []
|
||
for img in img_elements:
|
||
src = img.get('src') or img.get('data-src')
|
||
if src and src.startswith('https://mmbiz.qpic.cn'):
|
||
img_urls.append(src)
|
||
|
||
return title_text, article_text, img_urls
|
||
|
||
|
||
|
||
def wangyi_extract_content(url):
|
||
|
||
"""
|
||
使用requests和BeautifulSoup提取头条页面内容
|
||
"""
|
||
html_content = get_webpage_source_selenium(url)
|
||
|
||
# 使用BeautifulSoup解析HTML
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
||
|
||
|
||
# 提取标题和文章内容
|
||
title_selector = '#contain > div.post_main > h1'
|
||
article_selector = '#content > div.post_body'
|
||
# img_selector = "#content > div.post_body > p > img"
|
||
|
||
title_element = soup.select_one(title_selector)
|
||
article_element = soup.select_one(article_selector)
|
||
|
||
title_text = title_element.get_text().strip() if title_element else ""
|
||
article_text = article_element.get_text().strip() if article_element else ""
|
||
|
||
# 提取图片URL
|
||
img_selector = "#content > div.post_body > p > img"
|
||
img_elements = soup.select(img_selector)
|
||
img_elements = article_element.select('img') if article_element else []
|
||
|
||
# img_urls = extract_images_from_html(html_content)['image']
|
||
img_urls = [img.get('src') for img in img_elements if img.get('src')]
|
||
|
||
return title_text, article_text, img_urls
|
||
|
||
|
||
|
||
|
||
def souhu_extract_content(url):
|
||
|
||
"""
|
||
使用requests和BeautifulSoup提取头条页面内容
|
||
"""
|
||
html_content = get_webpage_source_selenium(url)
|
||
|
||
# 使用BeautifulSoup解析HTML
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
||
|
||
print(soup)
|
||
# 提取标题和文章内容
|
||
title_selector = '#article-container > div.left.main > div:nth-child(1) > div > div.text-title > h1'
|
||
article_selector = '#mp-editor'
|
||
# img_selector = "#content > div.post_body > p > img"
|
||
|
||
title_element = soup.select_one(title_selector)
|
||
article_element = soup.select_one(article_selector)
|
||
|
||
title_text = title_element.get_text().strip() if title_element else ""
|
||
article_text = article_element.get_text().strip() if article_element else ""
|
||
|
||
# 提取图片URL
|
||
# img_selector = "#mp-editor > p > img"
|
||
# img_elements = soup.select(img_selector)
|
||
img_elements = article_element.select('img') if article_element else []
|
||
|
||
|
||
img_urls = [img.get('src') for img in img_elements if img.get('src')]
|
||
|
||
return title_text, article_text, img_urls
|
||
|
||
|
||
def toutiao_w_extract_content(url):
|
||
"""
|
||
优化后的头条页面内容提取函数
|
||
专门获取文章内容中的图片链接
|
||
"""
|
||
html_content = get_webpage_source_selenium(url)
|
||
|
||
if not html_content:
|
||
print("获取HTML内容失败")
|
||
return "", "", []
|
||
|
||
# 使用BeautifulSoup解析HTML
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
||
# 多种标题选择器,按优先级尝试
|
||
title_selectors = [
|
||
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1',
|
||
'h1.article-title',
|
||
'h1[data-testid="headline"]',
|
||
'.article-title h1',
|
||
'.article-header h1',
|
||
'article h1',
|
||
'h1'
|
||
]
|
||
|
||
title_text = ""
|
||
for selector in title_selectors:
|
||
title_element = soup.select_one(selector)
|
||
if title_element:
|
||
title_text = title_element.get_text().strip()
|
||
break
|
||
|
||
# 多种文章内容选择器,按优先级尝试
|
||
article_selectors = [
|
||
'#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article',
|
||
'article',
|
||
'.article-content',
|
||
'.content',
|
||
'#js_content',
|
||
'.post_body',
|
||
'[data-testid="article-content"]'
|
||
]
|
||
|
||
article_text = ""
|
||
article_element = None
|
||
for selector in article_selectors:
|
||
article_element = soup.select_one(selector)
|
||
if article_element:
|
||
article_text = article_element.get_text().strip()
|
||
break
|
||
|
||
# 只从文章内容中提取图片
|
||
img_urls = []
|
||
|
||
if article_element:
|
||
# 查找文章内容中的所有图片元素
|
||
img_elements = article_element.find_all('img')
|
||
|
||
for img in img_elements:
|
||
# 尝试多种可能的图片URL属性
|
||
for attr in ['src', 'data-src', 'data-original', 'data-lazy-src']:
|
||
url = img.get(attr)
|
||
if url:
|
||
# 处理相对路径
|
||
if url.startswith('//'):
|
||
url = 'https:' + url
|
||
elif url.startswith('/'):
|
||
url = 'https://www.toutiao.com' + url
|
||
|
||
# 只收集头条相关的图片URL
|
||
if any(domain in url for domain in ['toutiaoimg.com', 'p3-sign.toutiaoimg.com', 'byteimg.com']):
|
||
img_urls.append(url)
|
||
break # 找到一个有效URL就跳出内层循环
|
||
|
||
# 如果上面没有找到图片,尝试使用现有的extract_images_from_html函数作为备选
|
||
if not img_urls:
|
||
extracted_imgs = extract_images_from_html(html_content)
|
||
if extracted_imgs and 'image' in extracted_imgs:
|
||
img_urls = extracted_imgs['image']
|
||
|
||
# 去重处理
|
||
img_urls = list(dict.fromkeys(img_urls))
|
||
|
||
return title_text, article_text, img_urls
|
||
|
||
|
||
def get_webpage_source_selenium(url):
|
||
"""
|
||
增强版的Selenium获取网页源代码函数
|
||
专门针对头条网站的动态加载特性进行优化
|
||
使用webdriver-manager自动管理ChromeDriver,并将驱动保存到项目目录
|
||
"""
|
||
import platform
|
||
|
||
chrome_options = Options()
|
||
chrome_options.add_argument('--headless')
|
||
chrome_options.add_argument('--disable-gpu')
|
||
chrome_options.add_argument('--no-sandbox')
|
||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
|
||
chrome_options.add_argument('--disable-images') # 禁用图片加载以提高速度
|
||
chrome_options.add_argument('--disable-javascript') # 如果不需要JS,可以禁用
|
||
chrome_options.add_argument(
|
||
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
|
||
|
||
# 使用webdriver-manager自动管理ChromeDriver,并保存到项目drivers目录
|
||
try:
|
||
# 设置drivers目录路径
|
||
project_root = os.getcwd()
|
||
drivers_path = os.path.join(project_root, "drivers")
|
||
|
||
# 确保drivers目录存在
|
||
os.makedirs(drivers_path, exist_ok=True)
|
||
|
||
# 检查drivers目录下是否已有兼容的ChromeDriver
|
||
chromedriver_path = None
|
||
if os.path.exists(drivers_path):
|
||
for root, dirs, files in os.walk(drivers_path):
|
||
for file in files:
|
||
# 检查文件名是否包含ChromeDriver并且与当前系统架构匹配
|
||
if file.lower().startswith("chromedriver"):
|
||
candidate_path = os.path.join(root, file)
|
||
# 检查文件是否可执行
|
||
if os.access(candidate_path, os.X_OK) or file.lower().endswith('.exe'):
|
||
# 对于Windows系统,确保是.exe文件
|
||
if platform.system() == 'Windows' and not file.lower().endswith('.exe'):
|
||
candidate_path += '.exe'
|
||
|
||
# 检查文件是否存在且可执行
|
||
if os.path.exists(candidate_path):
|
||
chromedriver_path = candidate_path
|
||
break
|
||
if chromedriver_path:
|
||
break
|
||
|
||
# 如果drivers目录下没有兼容的ChromeDriver,则使用webdriver-manager下载并保存
|
||
if not chromedriver_path:
|
||
try:
|
||
# 使用webdriver-manager下载适合当前系统的ChromeDriver到drivers目录
|
||
from webdriver_manager.core.driver_cache import DriverCacheManager
|
||
|
||
# 创建自定义缓存管理器,指定保存路径
|
||
cache_manager = DriverCacheManager(root_dir=drivers_path)
|
||
|
||
# 获取ChromeDriver路径
|
||
chromedriver_path = ChromeDriverManager(cache_manager=cache_manager).install()
|
||
logging.info(f"ChromeDriver已下载到: {chromedriver_path}")
|
||
except Exception as download_error:
|
||
logging.warning(f"使用webdriver-manager下载ChromeDriver失败: {download_error}")
|
||
# 尝试使用系统PATH中的ChromeDriver
|
||
chromedriver_path = None
|
||
|
||
# 使用找到的ChromeDriver路径
|
||
if chromedriver_path and os.path.exists(chromedriver_path):
|
||
try:
|
||
service = Service(chromedriver_path)
|
||
driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
except Exception as service_error:
|
||
logging.warning(f"使用本地ChromeDriver失败: {service_error}")
|
||
# 尝试使用webdriver-manager重新下载
|
||
try:
|
||
chromedriver_path = ChromeDriverManager().install()
|
||
service = Service(chromedriver_path)
|
||
driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
except Exception as fallback_error:
|
||
logging.warning(f"使用webdriver-manager重新下载也失败: {fallback_error}")
|
||
# 最后的备选方案
|
||
driver = webdriver.Chrome(options=chrome_options)
|
||
else:
|
||
# 如果所有方法都失败了,尝试使用系统PATH中的ChromeDriver
|
||
logging.warning("无法找到兼容的ChromeDriver,尝试使用系统PATH中的ChromeDriver")
|
||
driver = webdriver.Chrome(options=chrome_options)
|
||
|
||
except Exception as e:
|
||
# 最后的备选方案
|
||
logging.warning(f"ChromeDriver初始化失败,使用默认方式: {e}")
|
||
driver = webdriver.Chrome(options=chrome_options)
|
||
|
||
try:
|
||
driver.get(url)
|
||
|
||
# 等待页面加载完成
|
||
time.sleep(5)
|
||
|
||
# 尝试等待关键元素加载
|
||
wait = WebDriverWait(driver, 15)
|
||
try:
|
||
# 等待文章标题加载
|
||
wait.until(EC.presence_of_element_located((By.TAG_NAME, "h1")))
|
||
# 等待文章内容加载
|
||
wait.until(EC.presence_of_element_located((By.TAG_NAME, "article")))
|
||
except:
|
||
print("等待关键元素超时,使用当前页面内容")
|
||
|
||
# 滚动页面以触发懒加载
|
||
driver.execute_script("window.scrollTo(0, document.body.scrollHeight/2);")
|
||
time.sleep(2)
|
||
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
|
||
time.sleep(2)
|
||
driver.execute_script("window.scrollTo(0, 0);")
|
||
time.sleep(1)
|
||
|
||
page_source = driver.page_source
|
||
|
||
# # 保存源代码用于调试
|
||
# with open("toutiao_source_enhanced.html", "w", encoding="utf-8") as f:
|
||
# f.write(page_source)
|
||
|
||
return page_source
|
||
|
||
except Exception as e:
|
||
print(f"使用增强版Selenium获取网页源代码时出错: {e}")
|
||
return None
|
||
finally:
|
||
try:
|
||
driver.quit()
|
||
except:
|
||
pass
|
||
|