最近在学习Python爬虫技术,偶然看到一篇关于爬取美女图片的帖子,这激发了我的实践兴趣。
在测试原代码时,我发现虽然能成功爬取图片,但下载的文件均无法正常打开,且所有图片文件大小相同。经过分析,发现问题可能出在Referer的设置上。我对其进行了优化,改为根据不同的请求场景动态设置Referer,使其更符合网页的引用逻辑,从而解决了这个问题。
进一步优化时,我发现原代码只能按分页爬取,每个分页包含40套图,批量下载效率较低且缺乏选择性。为此,我实现了以下改进:
- 为每套图添加编号功能,支持按指定编号爬取
- 增加套图封面预览功能,先获取所有套图封面并编号显示
- 增强编号输入功能,支持单独编号和连续范围(如:1,3,5,7-9),输入时建议使用英文逗号分隔(虽然已对中文逗号做了兼容处理,但跨平台兼容性不能完全保证)
Python源代码:
import requests
import os
import re
import time
import random
from lxml import etree
from urllib.parse import urljoin
from requests.exceptions import RequestException
def validate_and_clean_path(path: str) -> str:
"""清理路径中的非法字符,确保文件夹可创建"""
invalid_chars = r'[\\/*?:"<>|]'
return re.sub(invalid_chars, '_', path)
def download_cover(session: requests.Session, cover_url: str, save_path: str, referer: str) -> bool:
"""下载主题封面图(保留原始尺寸)"""
try:
headers = {'Referer': referer}
res = session.get(cover_url, headers=headers, timeout=15, stream=True)
res.raise_for_status()
content_type = res.headers.get('Content-Type', '')
if not (content_type.startswith('image/jpeg') or content_type.startswith('image/png')):
print(f" ❌ 封面格式异常(非JPG/PNG):{content_type}")
return False
# 确保封面文件夹存在(即使父文件夹已创建,再次确认避免意外)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
with open(save_path, 'wb') as f:
for chunk in res.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
return True
except RequestException as e:
print(f" ❌ 网络错误:{str(e)}(可能是Referer缺失或URL无效)")
return False
except Exception as e:
print(f" ❌ 本地保存错误:{str(e)}")
return False
def get_theme_urls(session: requests.Session, base_url: str, start_page: int, end_page: int) -> list:
"""提取主题信息(URL、名称、封面URL)"""
themes = []
for page in range(start_page, end_page + 1):
try:
list_url = f"{base_url}/html/list_1_{page}.html"
print(f"正在获取列表页:{list_url}")
res = session.get(list_url, timeout=15)
res.raise_for_status()
res.encoding = 'gbk'
parser = etree.HTML(res.text)
theme_items = parser.xpath('//body/div[5]/ul/li')
if not theme_items:
print(f" ⚠️ 列表页{page}未找到主题项,建议检查XPath路径")
continue
for item in theme_items:
detail_link = item.xpath('./a/@href')
if not detail_link:
continue
full_detail_url = urljoin(base_url, detail_link[0])
theme_name = item.xpath('./p/a/text()')
if not theme_name:
theme_name = [f"未命名主题_{len(themes) + 1}"]
clean_name = validate_and_clean_path(theme_name[0])
cover_link = item.xpath('./a/img/@src')
if not cover_link:
print(f" ⚠️ 未提取到封面URL,跳过主题:{clean_name}")
continue
full_cover_url = urljoin(base_url, cover_link[0])
if not full_cover_url.startswith(''): #这里添加域名地址(其他地方正则匹配根据页面来进行适配)
print(f" ⚠️ 封面URL格式异常,跳过:{full_cover_url}")
continue
themes.append({
'detail_url': full_detail_url,
'name': clean_name,
'save_dir': f"pics/{clean_name}",
'cover_url': full_cover_url,
'referer_url': list_url
})
print(f" ✅ 列表页{page}成功提取{len(theme_items)}个主题项")
time.sleep(random.uniform(1.5, 3.0))
except RequestException as e:
print(f"获取列表页{page}失败:{str(e)},继续下一页")
continue
except Exception as e:
print(f"处理列表页{page}时出错:{str(e)}")
continue
print(f"\n===== 最终获取到{len(themes)}个有效主题(含可下载封面)=====")
return themes
def get_pic_urls(session: requests.Session, theme_detail_url: str) -> list:
"""从主题详情页提取所有图片URL"""
pic_urls = []
try:
res = session.get(theme_detail_url, timeout=15)
res.raise_for_status()
res.encoding = 'gbk'
parser = etree.HTML(res.text)
page_info = parser.xpath('//*[@id="pages"]/a[1]/text()')
total_pages = int(re.search(r'共(\d+)页', page_info[0]).group(1)) if (
page_info and re.search(r'共(\d+)页', page_info[0])) else 1
page_url_list = [theme_detail_url]
if total_pages > 1:
url_prefix = theme_detail_url.rsplit('.', 1)[0]
if '_' in url_prefix:
url_prefix = url_prefix.rsplit('_', 1)[0]
for i in range(2, total_pages + 1):
page_url_list.append(f"{url_prefix}_{i}.html")
for page_idx, page_url in enumerate(page_url_list, start=1):
try:
res_page = session.get(page_url, timeout=15)
res_page.raise_for_status()
res_page.encoding = 'gbk'
page_parser = etree.HTML(res_page.text)
img_xpath = '/html/body/div[8]/img/@src'
current_page_imgs = page_parser.xpath(img_xpath)
if not current_page_imgs:
print(f" ⚠️ 主题分页{page_idx}未找到图片,建议检查XPath")
continue
full_img_urls = [urljoin(theme_detail_url, img) for img in current_page_imgs]
pic_urls.extend(full_img_urls)
print(f" ✅ 分页{page_idx}/{total_pages}提取到{len(full_img_urls)}张图片")
time.sleep(random.uniform(1, 2))
except RequestException as e:
print(f" ❌ 分页{page_idx}请求失败:{str(e)},跳过该页")
continue
except RequestException as e:
print(f"请求主题首页失败:{str(e)}")
except Exception as e:
print(f"处理主题页时出错:{str(e)}")
return pic_urls
def download_single_pic(session: requests.Session, pic_url: str, save_name: str, save_dir: str, referer: str) -> bool:
"""下载单张图片(保留原始质量)"""
try:
headers = {'Referer': referer}
res = session.get(pic_url, headers=headers, timeout=20, stream=True)
res.raise_for_status()
content_type = res.headers.get('Content-Type', '')
if not (content_type.startswith('image/jpeg') or content_type.startswith('image/png')):
print(f" ❌ {save_name}:格式异常({content_type})")
return False
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, save_name)
with open(save_path, 'wb') as f:
for chunk in res.iter_content(chunk_size=4096):
if chunk:
f.write(chunk)
print(f" ✅ {save_name}:下载成功(保存至{save_path})")
return True
except RequestException as e:
print(f" ❌ {save_name}:网络错误({str(e)})")
except Exception as e:
print(f" ❌ {save_name}:本地保存错误({str(e)})")
return False
def check_input_symbols(input_str: str) -> tuple[bool, str]:
"""验证用户输入中的符号是否合法"""
if ',' in input_str:
return False, "❌ 发现中文逗号“,”!请替换为英文逗号“,”(半角逗号)"
if '-' in input_str:
return False, "❌ 发现中文减号“-”!请替换为英文减号“-”(半角减号)"
# 修复的正则表达式:-放在字符集结尾
illegal_chars = re.findall(r'[^0-9,\s-]', input_str)
if illegal_chars:
unique_illegal = list(set(illegal_chars))
return False, f"❌ 发现非法字符:{''.join(unique_illegal)}!仅支持数字、英文逗号“,”、英文减号“-”"
if not re.search(r'[0-9]', input_str):
return False, "❌ 未检测到有效数字!请输入编号(如1-3,6,8)"
return True, "✅ 符号格式合法"
def parse_user_selection(input_str: str, max_valid_index: int) -> list:
"""解析用户输入的主题编号"""
selected_indices = set()
input_str = input_str.strip().strip(',')
input_str = re.sub(r',+', ',', input_str)
input_str = re.sub(r'\s+', '', input_str)
for segment in input_str.split(','):
if not segment:
continue
if '-' in segment:
try:
start_idx, end_idx = map(int, segment.split('-'))
start_idx = max(1, start_idx)
end_idx = min(max_valid_index, end_idx)
if start_idx <= end_idx:
selected_indices.update(range(start_idx, end_idx + 1))
else:
print(f" ⚠️ 无效范围:{segment}(起始值大于结束值),已忽略")
except ValueError:
print(f" ⚠️ 范围格式错误:{segment}(正确格式如“1-3”,请勿在减号前后加空格),已忽略")
else:
try:
idx = int(segment)
if 1 <= idx <= max_valid_index:
selected_indices.add(idx)
else:
print(f" ⚠️ 编号{idx}超出范围(有效范围1-{max_valid_index}),已忽略")
except ValueError:
print(f" ⚠️ 编号格式错误:{segment}(请输入纯数字),已忽略")
return sorted(list(selected_indices))
def main():
target_base_url = "" #这里添加主域名比如:https://d.tiansj.net
session = requests.Session()
current_time = time.strftime("%Y%m%d_%H%M", time.localtime())
temp_cover_dir = f"封面批次_{current_time}_第{0}-{0}页"
while True:
try:
start_page = int(input("请输入起始页码:"))
end_page = int(input("请输入结束页码:"))
if start_page > 0 and end_page >= start_page:
cover_save_dir = temp_cover_dir.replace(f"第{0}-{0}页", f"第{start_page}-{end_page}页")
break
print(" ❌ 输入错误:起始页码必须>0,且结束页码≥起始页码")
except ValueError:
print(" ❌ 输入错误:请输入整数页码")
os.makedirs(cover_save_dir, exist_ok=True)
print(f"\n✅ 当前批次封面将保存至:{os.path.abspath(cover_save_dir)}")
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
})
print(f"\n===== 开始获取第{start_page}-{end_page}页的主题信息 =====")
theme_list = get_theme_urls(session, target_base_url, start_page, end_page)
if not theme_list:
print("未获取到任何有效主题,程序将退出")
return
print(f"\n===== 开始下载{len(theme_list)}个主题的封面图(保存至:{cover_save_dir})=====")
for theme_idx, theme_info in enumerate(theme_list, start=1):
short_theme_name = theme_info['name'][:15]
cover_save_path = os.path.join(cover_save_dir, f"{theme_idx}_{short_theme_name}.jpg")
print(f"正在处理主题 {theme_idx}/{len(theme_list)}:{theme_info['name']}")
if download_cover(session, theme_info['cover_url'], cover_save_path, theme_info['referer_url']):
print(f" ✅ 封面已保存至:{cover_save_path}")
else:
print(f" ❌ 封面下载失败")
time.sleep(random.uniform(0.8, 1.5))
print(f"\n===== 可下载主题列表(共{len(theme_list)}个)=====")
for idx, theme_info in enumerate(theme_list, start=1):
cover_path = os.path.join(cover_save_dir, f"{idx}_{theme_info['name'][:15]}.jpg")
print(f"{idx}. 主题:{theme_info['name']} | 封面路径:{cover_path}")
while True:
user_input = input("\n请输入要下载的主题编号(格式示例:1-3,6,8 或输入all下载全部):")
user_input = user_input.strip().lower()
if user_input == 'all':
selected_themes = list(range(1, len(theme_list) + 1))
print(f"✅ 已选择:全部主题(1-{len(theme_list)})")
break
is_valid_symbol, symbol_tip = check_input_symbols(user_input)
if not is_valid_symbol:
print(symbol_tip)
print("ℹ️ 请重新输入(示例:1-3,6,8 或 all)\n")
continue
selected_themes = parse_user_selection(user_input, len(theme_list))
if selected_themes:
print(f"✅ 已选择主题:{selected_themes}")
break
else:
print("❌ 未检测到有效编号!请重新输入(示例:1-3,6,8)\n")
print(f"\n===== 开始下载选中的{len(selected_themes)}个主题图片 =====")
for selected_idx in selected_themes:
target_theme = theme_list[selected_idx - 1]
print(f"\n=== 正在处理选中主题 {selected_idx}:{target_theme['name']} ===")
print("正在提取图片URL...")
theme_pic_urls = get_pic_urls(session, target_theme['detail_url'])
if not theme_pic_urls:
print(f" ⚠️ 未提取到任何图片URL,跳过该主题")
continue
print(f"共提取到{len(theme_pic_urls)}张图片,开始下载...")
success_count = 0
for pic_idx, pic_url in enumerate(theme_pic_urls, start=1):
pic_save_name = f"{pic_idx}.jpg"
if download_single_pic(session, pic_url, pic_save_name, target_theme['save_dir'],
target_theme['detail_url']):
success_count += 1
time.sleep(random.uniform(1.5, 3.0))
print(
f"主题{selected_idx}下载完成:成功{success_count}/{len(theme_pic_urls)}张 | 保存目录:{target_theme['save_dir']}")
print("\n===== 所有选中主题的下载任务已全部完成!=====")
if __name__ == "__main__":
main()
评论 (0)