
目标网站的反爬手段已从基础的 UA 检测、IP 封禁,升级为动态验证码、Cookie 验证、行为特征分析(如访问频率、点击轨迹)、JS 加密参数生成等。单一的请求伪装已无法突破多层防护,采集脚本的稳定性直接决定数据获取效率。
海量文档采集场景中,重复抓取历史数据会占用带宽、增加服务器压力,甚至触发反爬阈值。如何精准识别新增 / 更新文档、仅抓取变化数据,是降低采集成本、提升效率的核心问题。
面对网络波动、临时封禁、文档格式异常等问题,脚本需具备断点续传、失败重试、异常捕获能力,否则单次故障可能导致批量数据丢失。
反爬应对的核心逻辑是 “模拟正常用户行为 + 分散风险 + 动态适配”,以下是关键技术点及 Python 实现:
基础反爬的核心是让请求头接近真实浏览器,需动态配置 User-Agent、Referer、Accept 等参数,同时处理 Cookie 的持久化与更新。
python
运行
import requests
import random
from fake_useragent import UserAgent
# 初始化UserAgent池
ua = UserAgent()
# Cookie池(可从浏览器抓取或通过登录接口获取)
COOKIE_POOL = [
"sessionid=xxx; csrftoken=xxx",
"sessionid=yyy; csrftoken=yyy"
]
def get_headers():
"""生成随机请求头"""
headers = {
"User-Agent": ua.random,
"Referer": "https://target-site.com", # 模拟来源页
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
"Cookie": random.choice(COOKIE_POOL)
}
return headers
# 测试请求
def test_request(url):
try:
response = requests.get(
url,
headers=get_headers(),
timeout=10,
proxies={"http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890"} # 代理IP
)
response.raise_for_status() # 抛出HTTP错误
return response.text
except requests.exceptions.RequestException as e:
print(f"请求失败:{e}")
return None
if __name__ == "__main__":
test_url = "https://target-site.com/doc/1"
print(test_request(test_url))高频次请求是触发反爬的核心特征,需通过 “随机延迟 + IP 代理池” 分散请求特征。代理池建议选用付费高匿代理(如亿牛云),避免使用公开免费代理(易被封禁)。
python
import requests
import time
import random
from requests.exceptions import ProxyError
from fake_useragent import UserAgent
# 配置指定的代理信息
proxyHost = "www.16yun.cn"
proxyPort = "5445"
proxyUser = "16QMSOML"
proxyPass = "280651"
# 构建带认证的代理URL(HTTP/HTTPS通用)
proxy_url = f"http://{proxyUser}:{proxyPass}@{proxyHost}:{proxyPort}"
# 代理池(替换为指定代理,如需多代理可在此列表添加更多同格式地址)
PROXY_POOL = [proxy_url]
# 初始化UserAgent池(补充缺失的UA配置)
ua = UserAgent()
def get_headers():
"""生成随机请求头(补充原代码缺失的函数)"""
headers = {
"User-Agent": ua.random,
"Referer": "https://target-site.com",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive"
}
return headers
def get_proxy():
"""随机获取代理(适配新的代理池格式)"""
return random.choice(PROXY_POOL)
def safe_request(url, max_retry=3):
"""带重试、频率控制、代理切换的安全请求"""
retry_count = 0
while retry_count < max_retry:
try:
# 随机延迟1-5秒,模拟人工操作
time.sleep(random.uniform(1, 5))
proxy = get_proxy()
# 构建代理配置(区分http和https)
proxies = {
"http": proxy,
"https": proxy
}
response = requests.get(
url,
headers=get_headers(),
proxies=proxies,
timeout=10,
verify=False # 忽略SSL证书验证(部分代理环境需开启)
)
response.raise_for_status() # 主动抛出HTTP错误
if response.status_code == 200:
return response
else:
print(f"状态码异常:{response.status_code},重试中...")
retry_count += 1
continue
except ProxyError as e:
print(f"代理{proxy}失效:{str(e)},切换代理重试...")
retry_count += 1
except requests.exceptions.HTTPError as e:
print(f"HTTP请求错误:{str(e)},重试中...")
retry_count += 1
except Exception as e:
print(f"请求异常:{str(e)},重试中...")
retry_count += 1
print(f"请求{url}失败,已达最大重试次数")
return None
# 测试示例
if __name__ == "__main__":
test_url = "https://www.baidu.com"
resp = safe_request(test_url)
if resp:
print(f"请求成功,响应状态码:{resp.status_code}")
print(f"响应内容长度:{len(resp.text)}")针对滑块验证码、点选验证码,可集成第三方识别服务(如超级鹰、阿里云验证码识别);针对 JS 加密参数(如签名、token),可通过execjs执行前端 JS 代码生成参数,或逆向分析加密逻辑后用 Python 复现。
python
import execjs
# 读取前端加密JS文件(示例:生成签名参数)
with open("encrypt.js", "r", encoding="utf-8") as f:
js_code = f.read()
ctx = execjs.compile(js_code)
def get_sign(params):
"""调用JS加密函数生成签名"""
sign = ctx.call("generateSign", params)
return sign
# 构造请求参数
params = {
"doc_id": "123456",
"timestamp": int(time.time() * 1000)
}
params["sign"] = get_sign(params)
# 带签名的请求
response = safe_request(f"https://target-site.com/api/doc?{requests.compat.urlencode(params)}")增量抓取的核心是 “记录已抓取数据标识 + 对比最新数据标识”,常用标识包括文档 ID、更新时间、MD5 哈希值等,以下以 “文档 ID + 更新时间” 为例实现增量抓取:
选用轻量级的 SQLite 存储已抓取文档的标识(海量数据可替换为 MySQL/Redis),表结构设计如下:
sql
CREATE TABLE IF NOT EXISTS crawled_docs (
doc_id TEXT PRIMARY KEY, -- 文档唯一ID
update_time TEXT, -- 文档更新时间
crawl_time TEXT -- 抓取时间
);python
运行
import sqlite3
import datetime
from bs4 import BeautifulSoup # 需安装:pip install beautifulsoup4
# 初始化数据库连接
def init_db():
conn = sqlite3.connect("crawl_docs.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS crawled_docs (
doc_id TEXT PRIMARY KEY,
update_time TEXT,
crawl_time TEXT
)
""")
conn.commit()
return conn
# 获取已抓取的文档信息
def get_crawled_docs(conn):
cursor = conn.cursor()
cursor.execute("SELECT doc_id, update_time FROM crawled_docs")
return {row[0]: row[1] for row in cursor.fetchall()}
# 增量抓取主逻辑
def incremental_crawl(list_url, conn):
# 1. 爬取文档列表页
response = safe_request(list_url)
if not response:
return
soup = BeautifulSoup(response.text, "html.parser")
# 2. 提取文档列表(需根据目标网站结构调整Selector)
doc_items = soup.select(".doc-item")
current_docs = {}
for item in doc_items:
doc_id = item.attrs.get("data-doc-id")
update_time = item.select_one(".update-time").text.strip()
current_docs[doc_id] = update_time
# 3. 对比已抓取数据,筛选增量文档
crawled_docs = get_crawled_docs(conn)
incremental_doc_ids = []
for doc_id, update_time in current_docs.items():
# 新增文档 或 文档已更新
if doc_id not in crawled_docs or crawled_docs[doc_id] != update_time:
incremental_doc_ids.append(doc_id)
print(f"本次增量文档数量:{len(incremental_doc_ids)}")
# 4. 抓取增量文档
cursor = conn.cursor()
crawl_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for doc_id in incremental_doc_ids:
doc_url = f"https://target-site.com/doc/{doc_id}"
doc_response = safe_request(doc_url)
if doc_response:
# 解析文档内容(示例:提取正文)
doc_soup = BeautifulSoup(doc_response.text, "html.parser")
doc_content = doc_soup.select_one(".doc-content").text.strip()
# 保存文档内容(实际场景可存储到文件/数据库)
with open(f"docs/{doc_id}.txt", "w", encoding="utf-8") as f:
f.write(doc_content)
# 更新数据库
cursor.execute("""
REPLACE INTO crawled_docs (doc_id, update_time, crawl_time)
VALUES (?, ?, ?)
""", (doc_id, current_docs[doc_id], crawl_time))
conn.commit()
print(f"成功抓取文档:{doc_id}")
if __name__ == "__main__":
db_conn = init_db()
target_list_url = "https://target-site.com/doc-list"
incremental_crawl(target_list_url, db_conn)
db_conn.close()在代码中加入任务队列(如 Celery),将待抓取的文档 ID 存入队列,每次抓取前检查队列状态,故障恢复后可从断点继续执行。
单 IP / 单进程采集效率有限且易被封禁,可采用多进程 + 分布式架构(如 Scrapy-Redis),将采集任务分发到多个节点,分散反爬风险。
添加日志记录(如 logging 模块),监控抓取成功率、IP 存活状态、反爬触发频率,当异常指标超过阈值时,通过邮件 / 钉钉机器人发送告警。
Python 海量文档采集的稳定性,本质是 “反爬适配能力 + 增量抓取效率 + 容错机制” 的综合体现。本文通过请求伪装、频率控制、代理切换应对反爬,通过数据库对比实现增量抓取,结合实战代码构建了一套基础的稳定采集体系。
在实际应用中,需根据目标网站的反爬强度动态调整策略:对低防护网站,可简化反爬逻辑;对高防护网站,需结合无头浏览器(如 Selenium/Playwright)模拟真实浏览器行为,或对接付费代理池、验证码识别服务。同时,增量抓取的标识选择需贴合网站特性,确保精准识别数据变化。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。