from bs4 import BeautifulSoup import time import random from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import requests from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.service import Service import os import logging def extract_images_from_html(html_content): soup = BeautifulSoup(html_content, 'html.parser') # 匹配所有以 https://p3-sign.toutiaoi 开头的图片链接 img_tags = soup.find_all('img') img_urls = [] for img in img_tags: for attr in ['src', 'data-src']: url = img.get(attr) if url and url.startswith("https://p3-sign.toutiaoimg.com/tos-cn-i"): img_urls.append(url) # 去重处理 img_urls = list(dict.fromkeys(img_urls)) # 返回 JSON 格式 return {"image": img_urls} # ============================================================ def get_webpage_source(url): """ 获取网页源代码的通用函数 """ headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' } proxies = { 'http': None, 'https': None, } try: # 添加随机延迟,模拟人类行为 time.sleep(random.uniform(1, 3)) response = requests.get(url, headers=headers, proxies=proxies, timeout=10) response.encoding = 'utf-8' # 检查响应状态 if response.status_code == 200: return response.text else: print(f"请求失败,状态码: {response.status_code}") return None except Exception as e: print(f"获取网页源代码时出错: {e}") return None def get_webpage_source_selenium(url, timeout=30): """ 使用Selenium获取网页源代码,适用于动态加载内容的网站 修复了资源泄漏和异常处理问题 """ from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.service import Service from contextlib import contextmanager @contextmanager def get_driver(): """使用上下文管理器确保WebDriver正确关闭""" driver = None service = None try: # 配置Chrome选项 chrome_options = Options() chrome_options.add_argument('--headless') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-blink-features=AutomationControlled') chrome_options.add_argument('--disable-images') # 禁用图片加载以提高速度 chrome_options.add_argument('--disable-javascript') # 如果不需要JS,可以禁用 chrome_options.add_argument( 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36') # 获取 chromedriver 路径(自动检测系统架构) def get_chromedriver_path(): """获取 chromedriver 路径,自动检测 32/64 位架构""" import sys import platform # PyInstaller 打包后的临时目录 if getattr(sys, 'frozen', False): base_path = sys._MEIPASS else: base_path = os.path.dirname(os.path.abspath(__file__)) # 检测系统架构 is_64bit = platform.machine().endswith('64') driver_name = 'chromedriver64.exe' if is_64bit else 'chromedriver32.exe' logging.info(f"检测到系统架构: {'64位' if is_64bit else '32位'},将使用 {driver_name}") # 可能的驱动路径 possible_paths = [ os.path.join(base_path, 'drivers', driver_name), os.path.join(base_path, 'drivers', 'chromedriver.exe'), # 备用 os.path.join(base_path, driver_name), os.path.join(os.path.dirname(sys.executable), 'drivers', driver_name), os.path.join(os.path.dirname(sys.executable), driver_name), ] for path in possible_paths: if os.path.exists(path): logging.info(f"找到chromedriver: {path}") return path return None # 优先使用本地驱动,如果找不到则报错 driver_path = get_chromedriver_path() if driver_path: service = Service(driver_path) logging.info(f"使用本地chromedriver: {driver_path}") else: error_msg = "未找到chromedriver,请运行 python setup_driver.py 下载驱动" logging.error(error_msg) raise FileNotFoundError(error_msg) # 初始化WebDriver driver = webdriver.Chrome(service=service, options=chrome_options) driver.set_page_load_timeout(timeout) # 设置页面加载超时 driver.implicitly_wait(10) # 设置隐式等待 yield driver except Exception as e: logging.error(f"WebDriver初始化失败: {e}") raise finally: # 确保driver和service都被正确关闭 try: if driver: driver.quit() except Exception as e: logging.error(f"关闭WebDriver时出错: {e}") try: if service: service.stop() except AttributeError: pass except Exception as e: logging.error(f"停止WebDriver服务时出错: {e}") try: with get_driver() as driver: # 访问URL driver.get(url) # 等待页面加载完成 time.sleep(random.uniform(2, 4)) # 随机延迟 # 尝试等待文章内容加载 try: WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "article")) ) except Exception as e: logging.warning(f"等待文章元素超时: {e},将使用当前页面内容") # 获取页面源代码 page_source = driver.page_source return page_source except Exception as e: logging.error(f"使用Selenium获取网页源代码时出错: {e}") return None # =====================采集内容内容================================== # def toutiao_w_extract_content(url): # """ # 使用requests和BeautifulSoup提取头条页面内容 # """ # html_content = get_webpage_source_selenium(url) # # # 使用BeautifulSoup解析HTML # soup = BeautifulSoup(html_content, 'html.parser') # # # 提取标题和文章内容 # article_element = soup.select_one('article') # # if not article_element: # # 尝试其他可能的选择器 # article_element = soup.select_one('.article-content') or soup.select_one('.content') # # title_element = soup.select_one('h1') or soup.select_one('.article-title') # title_text = title_element.get_text().strip() if title_element else "" # article_text = article_element.get_text().strip() if article_element else "" # # # 提取图片URL # img_elements = article_element.select('img') if article_element else [] # img_urls = [img.get('src') for img in img_elements if img.get('src')] # # return title_text, article_text, img_urls def toutiao_extract_content(url): """ 使用requests和BeautifulSoup提取头条页面内容 """ html_content = get_webpage_source_selenium(url) if not html_content: print("获取HTML内容失败") return "", "", [] # 使用BeautifulSoup解析HTML soup = BeautifulSoup(html_content, 'html.parser') # 提取标题和文章内容 title_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1' article_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article' title_element = soup.select_one(title_selector) article_element = soup.select_one(article_selector) title_text = title_element.get_text().strip() if title_element else "" article_text = article_element.get_text().strip() if article_element else "" # 提取图片URL # img_selector = "#root > div.article-detail-container > div.main > div.show-monitor article img" # img_elements = soup.select(img_selector) # # img_elements = article_element.select('img') if article_element else [] img_urls = extract_images_from_html(html_content)['image'] # img_urls = [img.get('src') for img in img_elements if img.get('src').startswith("https://p3")] return title_text, article_text, img_urls def wechat_extract_content(url): """ 使用requests和BeautifulSoup提取微信公众号页面内容 """ html_content = get_webpage_source_selenium(url) if not html_content: print("获取HTML内容失败") return "", "", [] # 使用BeautifulSoup解析HTML soup = BeautifulSoup(html_content, 'html.parser') # 使用指定的选择器提取标题和文章内容 title_element = soup.select_one('#activity-name') article_element = soup.select_one('#js_content') title_text = title_element.get_text().strip() if title_element else "" article_text = article_element.get_text().strip() if article_element else "" # 提取特定 section 中的图片 URL(仅保留以 https://mmbiz.qpic.cn 开头的) img_elements = article_element.select('img') if article_element else [] img_urls = [] for img in img_elements: src = img.get('src') or img.get('data-src') if src and src.startswith('https://mmbiz.qpic.cn'): img_urls.append(src) return title_text, article_text, img_urls def wangyi_extract_content(url): """ 使用requests和BeautifulSoup提取头条页面内容 """ html_content = get_webpage_source_selenium(url) if not html_content: print("获取HTML内容失败") return "", "", [] # 使用BeautifulSoup解析HTML soup = BeautifulSoup(html_content, 'html.parser') # 提取标题和文章内容 title_selector = '#contain > div.post_main > h1' article_selector = '#content > div.post_body' # img_selector = "#content > div.post_body > p > img" title_element = soup.select_one(title_selector) article_element = soup.select_one(article_selector) title_text = title_element.get_text().strip() if title_element else "" article_text = article_element.get_text().strip() if article_element else "" # 提取图片URL img_selector = "#content > div.post_body > p > img" img_elements = soup.select(img_selector) img_elements = article_element.select('img') if article_element else [] # img_urls = extract_images_from_html(html_content)['image'] img_urls = [img.get('src') for img in img_elements if img.get('src')] return title_text, article_text, img_urls def souhu_extract_content(url): """ 使用requests和BeautifulSoup提取头条页面内容 """ html_content = get_webpage_source_selenium(url) if not html_content: print("获取HTML内容失败") return "", "", [] # 使用BeautifulSoup解析HTML soup = BeautifulSoup(html_content, 'html.parser') # 提取标题和文章内容 title_selector = '#article-container > div.left.main > div:nth-child(1) > div > div.text-title > h1' article_selector = '#mp-editor' # img_selector = "#content > div.post_body > p > img" title_element = soup.select_one(title_selector) article_element = soup.select_one(article_selector) title_text = title_element.get_text().strip() if title_element else "" article_text = article_element.get_text().strip() if article_element else "" # 提取图片URL # img_selector = "#mp-editor > p > img" # img_elements = soup.select(img_selector) img_elements = article_element.select('img') if article_element else [] img_urls = [img.get('src') for img in img_elements if img.get('src')] return title_text, article_text, img_urls def toutiao_w_extract_content(url): """ 优化后的头条页面内容提取函数 专门获取文章内容中的图片链接 """ html_content = get_webpage_source_selenium(url) if not html_content: print("获取HTML内容失败") return "", "", [] # 使用BeautifulSoup解析HTML soup = BeautifulSoup(html_content, 'html.parser') # 多种标题选择器,按优先级尝试 title_selectors = [ '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1', 'h1.article-title', 'h1[data-testid="headline"]', '.article-title h1', '.article-header h1', 'article h1', 'h1' ] title_text = "" for selector in title_selectors: title_element = soup.select_one(selector) if title_element: title_text = title_element.get_text().strip() break # 多种文章内容选择器,按优先级尝试 article_selectors = [ '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article', 'article', '.article-content', '.content', '#js_content', '.post_body', '[data-testid="article-content"]' ] article_text = "" article_element = None for selector in article_selectors: article_element = soup.select_one(selector) if article_element: article_text = article_element.get_text().strip() break # 只从文章内容中提取图片 img_urls = [] if article_element: # 查找文章内容中的所有图片元素 img_elements = article_element.find_all('img') for img in img_elements: # 尝试多种可能的图片URL属性 for attr in ['src', 'data-src', 'data-original', 'data-lazy-src']: url = img.get(attr) if url: # 处理相对路径 if url.startswith('//'): url = 'https:' + url elif url.startswith('/'): url = 'https://www.toutiao.com' + url # 只收集头条相关的图片URL if any(domain in url for domain in ['toutiaoimg.com', 'p3-sign.toutiaoimg.com', 'byteimg.com']): img_urls.append(url) break # 找到一个有效URL就跳出内层循环 # 如果上面没有找到图片,尝试使用现有的extract_images_from_html函数作为备选 if not img_urls: extracted_imgs = extract_images_from_html(html_content) if extracted_imgs and 'image' in extracted_imgs: img_urls = extracted_imgs['image'] # 去重处理 img_urls = list(dict.fromkeys(img_urls)) return title_text, article_text, img_urls def extract_content_with_retry(url, max_retries=3): """ 通用的内容提取函数,带重试机制 Args: url: 文章URL max_retries: 最大重试次数 Returns: tuple: (title, content, images) """ import time import random def _extract_by_domain(url): """根据域名选择合适的提取函数""" if "toutiao.com" in url: return toutiao_extract_content(url) elif "mp.weixin.qq.com" in url: return wechat_extract_content(url) elif "www.163.com" in url or "163.com" in url: return wangyi_extract_content(url) elif "www.sohu.com" in url or "sohu.com" in url: return souhu_extract_content(url) else: # 默认使用通用提取 return "", "", [] # 默认提取未实现 # 重试机制 for attempt in range(max_retries): try: result = _extract_by_domain(url) title, content, images = result # 验证提取结果 if title or content or images: return title, content, images else: logging.warning(f"第 {attempt + 1} 次尝试提取结果为空,URL: {url}") except Exception as e: logging.error(f"第 {attempt + 1} 次尝试失败: {e}") # 等待后重试 if attempt < max_retries - 1: delay = random.uniform(1, 3) time.sleep(delay) # 所有重试都失败 logging.error(f"提取内容失败,已重试 {max_retries} 次: {url}") return "", "", []