我用Python写爬虫被封了3次,改了这个参数后日抓10万条数据
年初做技术岗薪资分析,目标是某招聘平台的 Python / Java / Go 岗位数据,
预计抓 5 万条,跑了 8 小时,早上起来打开终端——满屏 403,日志显示程序
在前 22 分钟就已经被封,之后两个多小时在徒劳重试。实际入库:214 条。
那一刻我意识到,招聘平台的反爬力度远比普通电商网站强——毕竟他们的数据
是核心商业资产。
后来被封了三次,每次原因都不一样。这篇文章把三次经历完整记录下来,不只是结论,包括每次走的弯路、分析过程,以及最终跑通日抓 10 万条的完整代码。
一、第一次被封:最低级的错误
案发现场
目标是抓取某招聘平台的 Python 工程师岗位,字段包括职位名称、薪资区间、公司规模、城市、发布时间。我直接上了 requests:
# 第一版:能跑,活不过 20 分钟
import requests
from bs4 import BeautifulSoup
def get_jobs(keyword, page):
url = f"https://example-jobs.com/search?q={keyword}&page={page}"
resp = requests.get(url)
soup = BeautifulSoup(resp.text, 'html.parser')
return soup.find_all('div', class_='job-card')
for i in range(1, 500):
jobs = get_jobs('Python工程师', i)
print(f'第 {i} 页:{len(jobs)} 条')
跑了不到半小时,全是 403。
原因分析
打开抓包工具看了一眼请求头,问题一目了然:
User-Agent: python-requests/2.31.0
Accept: */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
这几乎等于在请求头里写了"我是爬虫"。没有 Accept-Language,没有 Referer,UA 直接暴露了 requests 库版本,而且每个请求之间没有任何间隔,频率远超正常用户。
解决方案:完整 Header 伪装 + UA 池 + 请求限速
import requests
import random
import time
from bs4 import BeautifulSoup
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:124.0) "
"Gecko/20100101 Firefox/124.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3) AppleWebKit/605.1.15 "
"(KHTML, like Gecko) Version/17.3 Safari/605.1.15",
]
def build_headers(referer=None):
return {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Referer": referer or "https://example-jobs.com/",
}
session = requests.Session()
def get_jobs(keyword, page):
url = f"https://example-jobs.com/search?q={keyword}&page={page}"
headers = build_headers(
referer=f"https://example-jobs.com/search?q={keyword}&page={page-1}"
)
resp = session.get(url, headers=headers, timeout=10)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, 'html.parser')
return soup.find_all('div', class_='job-card')
for i in range(1, 500):
jobs = get_jobs('Python工程师', i)
print(f'第 {i} 页:{len(jobs)} 条')
time.sleep(random.uniform(1.5, 3.5)) # 随机延迟,模拟人类翻页节奏
加上这些之后能跑了。但两天后又被封,这次原因完全不同。
二、第二次被封:IP 频率超限
案发现场
这次封得更彻底:直接
429 Too Many Requests,带了Retry-After: 86400,
让我等 24 小时。
原因分析
Header 伪装解决了"看起来像不像人"的问题,但没有解决"同一个 IP 请求太频繁"的问题。招聘平台通常对每个 IP 做滑动窗口限速,单 IP 每小时超过 200~300 次请求就会触发封禁。我的爬虫跑了 4 小时,换算下来单 IP 日请求接近 8000 次,早就超了。
解决方案:动态代理池
import requests
import random
import time
from threading import Lock
class ProxyPool:
def __init__(self, proxies: list):
self.proxies = proxies
self.failed = set()
self.lock = Lock()
def get(self) -> dict | None:
available = [p for p in self.proxies if p not in self.failed]
if not available:
return None
proxy = random.choice(available)
return {"http": proxy, "https": proxy}
def mark_failed(self, proxy: str):
with self.lock:
self.failed.add(proxy)
print(f"[代理池] 剔除失效代理: {proxy},"
f"剩余: {len(self.proxies) - len(self.failed)}")
def health_check(self, test_url="https://httpbin.org/ip", timeout=5):
valid = []
for proxy in self.proxies:
try:
r = requests.get(
test_url,
proxies={"http": proxy, "https": proxy},
timeout=timeout
)
if r.status_code == 200:
valid.append(proxy)
except Exception:
pass
self.proxies = valid
print(f"[代理池] 健康检测完成,可用代理: {len(valid)} 个")
def scrape_with_proxy(url, pool: ProxyPool, retries=3):
for attempt in range(retries):
proxy = pool.get()
if proxy is None:
raise RuntimeError("代理池已耗尽")
try:
resp = requests.get(
url,
headers=build_headers(),
proxies=proxy,
timeout=10
)
if resp.status_code == 429:
pool.mark_failed(list(proxy.values()))
time.sleep(2 ** attempt)
continue
resp.raise_for_status()
return resp
except requests.ProxyError:
pool.mark_failed(list(proxy.values()))
except requests.Timeout:
time.sleep(1)
return None
proxies = [
"http://user:pass@proxy1.example.com:8080",
"http://user:pass@proxy2.example.com:8080",
]
pool = ProxyPool(proxies)
pool.health_check()
关于代理来源:免费代理可用率通常低于 20%,生产环境建议用付费住宅代理,
可用率能稳定在 90% 以上。
这次跑了三天没出问题。直到换了目标平台,遇到了真正的硬骨头。
三、第三次被封:遇上了 Cloudflare
案发现场
新目标上了 Cloudflare 防护,访问直接触发"正在检查您的浏览器…"五秒盾,
或者直接Error 1020: Access denied。
代理换了没用,Header 换了没用,Session 也没用。
还有额外的拦截:未登录状态只能看前 3 页;同一账号多 IP 登录会触发风控。
原因分析
Cloudflare 的检测机制分三层:
第一层:JS Challenge(五秒盾)
返回一个 HTML 页面,让浏览器执行 JavaScript 计算答案后再发请求。普通 requests 根本不执行 JS,自然通不过。
第二层:TLS 指纹检测
每个 HTTP 客户端建立 TLS 连接时,会发送特定的握手特征(JA3 指纹),Python requests 和真实浏览器的 TLS 指纹完全不同,Cloudflare 一眼识别。
第三层:Turnstile 人机验证
通过行为分析和浏览器环境检测判断是否是真人,最难绕过。
三层对应三套解法:
解法一:cloudscraper 解决 JS Challenge
# pip install cloudscraper
import cloudscraper
scraper = cloudscraper.create_scraper(
browser={'browser': 'chrome', 'platform': 'windows', 'mobile': False}
)
resp = scraper.get('https://cloudflare-protected-jobs.com')
print(resp.status_code)
解法二:curl_cffi 解决 TLS 指纹检测
# pip install curl_cffi
from curl_cffi import requests as curl_requests
import cloudscraper
# 第一步:用 cloudscraper 拿到 cf_clearance Cookie
cf_scraper = cloudscraper.create_scraper()
cf_scraper.get("https://example-jobs.com")
cookies = cf_scraper.cookies.get_dict()
ua = cf_scraper.headers.get('User-Agent')
# 第二步:用 curl_cffi + Cookie 发实际请求,TLS 指纹正确
resp = curl_requests.get(
"https://example-jobs.com/search?q=Python工程师&page=1",
impersonate="chrome122",
cookies=cookies,
headers={"User-Agent": ua}
)
解法三:nodriver 解决 Turnstile 验证
# pip install nodriver
import nodriver as uc
import asyncio
async def scrape_with_nodriver(url):
browser = await uc.start(
headless=False,
browser_args=['--no-sandbox',
'--disable-blink-features=AutomationControlled']
)
page = await browser.get(url)
await page.wait_for(selector='div.job-list', timeout=10)
content = await page.get_content()
await browser.stop()
return content
content = asyncio.run(
scrape_with_nodriver('https://example-jobs.com/search?q=Python工程师')
)
注意:
undetected-chromedriver已于 2025 年 2 月停止维护,
新项目不要再用,直接换nodriver。
账号池处理登录态问题:
ACCOUNTS = [
{"cookie": "sessionid=abc123; _token=xxx", "ua": USER_AGENTS},
{"cookie": "sessionid=def456; _token=yyy", "ua": USER_AGENTS},[1]
# 建议准备 5~10 个账号轮换
]
def build_headers_with_account(account):
return {
"User-Agent": account["ua"],
"Cookie": account["cookie"],
"Referer": "https://example-jobs.com/",
"Accept-Language": "zh-CN,zh;q=0.9",
}
四、最终方案:日抓 10 万条的完整架构
把所有方案整合进一套异步并发框架,是跑出稳定数据量的关键。
完整生产级代码
import asyncio
import random
import time
import sqlite3
import pymysql
from curl_cffi.requests import AsyncSession
from bs4 import BeautifulSoup
from pybloom_live import ScalableBloomFilter
# ── 配置 ─────────────────────────────────────────────
CONCURRENCY = 50
REQUEST_DELAY = (0.5, 1.5)
MAX_RETRIES = 3
PROXY_LIST = [
"http://user:pass@proxy1.example.com:8080",
"http://user:pass@proxy2.example.com:8080",
]
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
]
# ── MySQL 存储 ────────────────────────────────────────
def get_db_conn():
return pymysql.connect(
host='127.0.0.1', port=3306,
user='root', password='yourpassword',
database='jobs_db',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
def init_table():
conn = get_db_conn()
with conn.cursor() as cur:
cur.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
job_id VARCHAR(64) UNIQUE NOT NULL,
title VARCHAR(200),
salary VARCHAR(50),
company VARCHAR(200),
city VARCHAR(50),
experience VARCHAR(50),
education VARCHAR(50),
tags VARCHAR(500),
published VARCHAR(50),
source_url VARCHAR(500),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
''')
conn.commit()
conn.close()
def save_jobs(jobs: list):
if not jobs:
return
conn = get_db_conn()
sql = '''
INSERT IGNORE INTO jobs
(job_id, title, salary, company, city,
experience, education, tags, published, source_url)
VALUES
(%(job_id)s, %(title)s, %(salary)s, %(company)s, %(city)s,
%(experience)s, %(education)s, %(tags)s, %(published)s, %(source_url)s)
'''
with conn.cursor() as cur:
cur.executemany(sql, jobs)
conn.commit()
conn.close()
# ── 去重过滤器 ────────────────────────────────────────
bloom = ScalableBloomFilter(mode=ScalableBloomFilter.SMALL_SET_GROWTH)
# ── 断点续爬 ──────────────────────────────────────────
def init_state_db():
conn = sqlite3.connect('scraper_state.db')
conn.execute('''CREATE TABLE IF NOT EXISTS done_urls
(url TEXT PRIMARY KEY, ts INTEGER)''')
conn.commit()
return conn
def is_done(conn, url):
return conn.execute(
'SELECT 1 FROM done_urls WHERE url=?', (url,)
).fetchone() is not None
def mark_done(conn, url):
conn.execute('INSERT OR IGNORE INTO done_urls VALUES (?,?)',
(url, int(time.time())))
conn.commit()
# ── 请求 ──────────────────────────────────────────────
async def fetch(session, url, proxy):
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Referer": "https://example-jobs.com/",
}
for attempt in range(MAX_RETRIES):
try:
resp = await session.get(
url, headers=headers, proxy=proxy,
impersonate="chrome122", timeout=15
)
if resp.status_code == 200:
return resp.text
if resp.status_code in (403, 429):
proxy = random.choice(PROXY_LIST)
await asyncio.sleep(2 ** attempt)
except Exception:
await asyncio.sleep(1)
return None
# ── 解析 ──────────────────────────────────────────────
def parse_jobs(html, url):
soup = BeautifulSoup(html, 'html.parser')
jobs = []
for card in soup.select('div.job-card'):
job_id = card.get('data-jobid', '')
if not job_id or job_id in bloom:
continue
bloom.add(job_id)
jobs.append({
'job_id': job_id,
'title': card.select_one('.job-title').get_text(strip=True),
'salary': card.select_one('.salary').get_text(strip=True),
'company': card.select_one('.company-name').get_text(strip=True),
'city': card.select_one('.location').get_text(strip=True),
'experience': card.select_one('.exp').get_text(strip=True),
'education': card.select_one('.edu').get_text(strip=True),
'tags': ','.join(t.get_text() for t in card.select('.tag')),
'published': card.select_one('.pub-date').get_text(strip=True),
'source_url': url,
})
return jobs
# ── Worker ────────────────────────────────────────────
total_count = 0
async def worker(queue, session, state_conn, worker_id):
global total_count
while True:
url = await queue.get()
try:
if is_done(state_conn, url):
continue
proxy = random.choice(PROXY_LIST)
html = await fetch(session, url, proxy)
if html:
jobs = parse_jobs(html, url)
save_jobs(jobs)
mark_done(state_conn, url)
total_count += len(jobs)
print(f"[Worker-{worker_id}] {len(jobs)} 条 | "
f"累计: {total_count} | {url}")
await asyncio.sleep(random.uniform(*REQUEST_DELAY))
finally:
queue.task_done()
# ── 主入口 ────────────────────────────────────────────
async def main():
init_table()
state_conn = init_state_db()
queue = asyncio.Queue()
CITIES = ['beijing', 'shanghai', 'shenzhen', 'hangzhou', 'chengdu']
KEYWORDS = ['Python工程师', 'Java工程师', 'Go工程师',
'前端工程师', '算法工程师', '数据工程师']
urls = [
f"https://example-jobs.com/search?city={city}&q={kw}&page={page}"
for city in CITIES
for kw in KEYWORDS
for page in range(1, 101)
]
# 5城市 × 6岗位 × 100页 = 3000个URL,每页约30条,理论总量 ≈ 9万条
for url in urls:
await queue.put(url)
async with AsyncSession() as session:
workers = [
asyncio.create_task(worker(queue, session, state_conn, i))
for i in range(CONCURRENCY)
]
await queue.join()
for w in workers:
w.cancel()
print(f"\n全部完成,共入库 {total_count} 条数据")
asyncio.run(main())
五、避坑清单
请求层
- UA 池覆盖 Chrome、Firefox、Safari 主流版本,每次请求随机选取
- 携带完整请求头:
Accept、Accept-Language、Accept-Encoding、Referer - 请求间隔加随机抖动,不要用固定延迟
- 使用
Session保持 Cookie,模拟真实会话
IP 层
- 代理池启动时做健康检测,剔除不可用 IP
- 遇到 403/429 自动切换代理,不要在同一 IP 上反复重试
- 生产环境用付费住宅代理,免费代理可用率不稳定
- 单 IP 每小时请求量控制在目标网站阈值的 60% 以内
反爬层
- 遇到 Cloudflare JS Challenge:用
cloudscraper - 遇到 TLS 指纹检测:换
curl_cffi,指定impersonate参数 - 遇到 Turnstile 人机验证:用
nodriver真实浏览器自动化 undetected-chromedriver已停止维护,不要再用
架构层
- 用
asyncio异步并发,比多线程资源消耗小得多 - 断点续爬必须做,任务状态持久化到 SQLite 或 Redis
- 数据去重用 BloomFilter,比
set内存占用低 10 倍以上 - 数据库写入用
executemany批量插入,性能比逐条插入高 20 倍
合规层
- 爬取前检查
robots.txt,尊重Disallow规则 - 数据仅用于个人研究或合规业务,不做二次分发
- 爬取频率不影响目标网站正常服务
写在最后
招聘数据是程序员最有动力去爬的一类数据——毕竟和自己的钱包直接挂钩。
用这套方案跑完之后,我做了一张各城市 Python 岗位薪资分布图,发现杭州
的中位数薪资已经超过北京,把这个结论发在群里,直接炸出来一堆人让我把数据共享出来。
回头看这三次被封,每次都是一次强制升级。第一次教会我基本伪装,第二次让我搞清楚 IP 管理,第三次把我逼着去研究 TLS 和浏览器指纹。真正让爬虫稳定跑起来的,不是某一个参数,而是把请求伪装、IP 管理、反爬对抗、并发架构这几个环节都做扎实,缺哪一块都会在某个地方翻车。
如果这篇文章帮你少踩了一个坑,转发给同样在爬数据的朋友,也许他能少走些弯路。
- 点赞
- 收藏
- 关注作者
评论(0)