我用Python写爬虫被封了3次,改了这个参数后日抓10万条数据

举报
张胜凡 发表于 2026/04/04 13:49:45 2026/04/04
【摘要】 程序员必看:从 IP 封禁到 User-Agent 反爬,再到 Cloudflare 人机验证,我踩过的所有坑一次性说清

年初做技术岗薪资分析,目标是某招聘平台的 Python / Java / Go 岗位数据,
预计抓 5 万条,跑了 8 小时,早上起来打开终端——满屏 403,日志显示程序
在前 22 分钟就已经被封,之后两个多小时在徒劳重试。

实际入库:214 条。

那一刻我意识到,招聘平台的反爬力度远比普通电商网站强——毕竟他们的数据
是核心商业资产。

后来被封了三次,每次原因都不一样。这篇文章把三次经历完整记录下来,不只是结论,包括每次走的弯路、分析过程,以及最终跑通日抓 10 万条的完整代码。


一、第一次被封:最低级的错误

案发现场

目标是抓取某招聘平台的 Python 工程师岗位,字段包括职位名称、薪资区间、公司规模、城市、发布时间。我直接上了 requests

# 第一版:能跑,活不过 20 分钟
import requests
from bs4 import BeautifulSoup

def get_jobs(keyword, page):
    url = f"https://example-jobs.com/search?q={keyword}&page={page}"
    resp = requests.get(url)
    soup = BeautifulSoup(resp.text, 'html.parser')
    return soup.find_all('div', class_='job-card')

for i in range(1, 500):
    jobs = get_jobs('Python工程师', i)
    print(f'第 {i} 页:{len(jobs)} 条')

跑了不到半小时,全是 403。

原因分析

打开抓包工具看了一眼请求头,问题一目了然:

User-Agent: python-requests/2.31.0
Accept: */*
Accept-Encoding: gzip, deflate
Connection: keep-alive

这几乎等于在请求头里写了"我是爬虫"。没有 Accept-Language,没有 Referer,UA 直接暴露了 requests 库版本,而且每个请求之间没有任何间隔,频率远超正常用户。

解决方案:完整 Header 伪装 + UA 池 + 请求限速

import requests
import random
import time
from bs4 import BeautifulSoup

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:124.0) "
    "Gecko/20100101 Firefox/124.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3) AppleWebKit/605.1.15 "
    "(KHTML, like Gecko) Version/17.3 Safari/605.1.15",
]

def build_headers(referer=None):
    return {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;"
                  "q=0.9,image/avif,image/webp,*/*;q=0.8",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
        "Connection": "keep-alive",
        "Upgrade-Insecure-Requests": "1",
        "Referer": referer or "https://example-jobs.com/",
    }

session = requests.Session()

def get_jobs(keyword, page):
    url = f"https://example-jobs.com/search?q={keyword}&page={page}"
    headers = build_headers(
        referer=f"https://example-jobs.com/search?q={keyword}&page={page-1}"
    )
    resp = session.get(url, headers=headers, timeout=10)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, 'html.parser')
    return soup.find_all('div', class_='job-card')

for i in range(1, 500):
    jobs = get_jobs('Python工程师', i)
    print(f'第 {i} 页:{len(jobs)} 条')
    time.sleep(random.uniform(1.5, 3.5))  # 随机延迟,模拟人类翻页节奏

加上这些之后能跑了。但两天后又被封,这次原因完全不同。


二、第二次被封:IP 频率超限

案发现场

这次封得更彻底:直接 429 Too Many Requests,带了 Retry-After: 86400
让我等 24 小时。

原因分析

Header 伪装解决了"看起来像不像人"的问题,但没有解决"同一个 IP 请求太频繁"的问题。招聘平台通常对每个 IP 做滑动窗口限速,单 IP 每小时超过 200~300 次请求就会触发封禁。我的爬虫跑了 4 小时,换算下来单 IP 日请求接近 8000 次,早就超了。

解决方案:动态代理池

import requests
import random
import time
from threading import Lock

class ProxyPool:
    def __init__(self, proxies: list):
        self.proxies = proxies
        self.failed = set()
        self.lock = Lock()

    def get(self) -> dict | None:
        available = [p for p in self.proxies if p not in self.failed]
        if not available:
            return None
        proxy = random.choice(available)
        return {"http": proxy, "https": proxy}

    def mark_failed(self, proxy: str):
        with self.lock:
            self.failed.add(proxy)
            print(f"[代理池] 剔除失效代理: {proxy},"
                  f"剩余: {len(self.proxies) - len(self.failed)}")

    def health_check(self, test_url="https://httpbin.org/ip", timeout=5):
        valid = []
        for proxy in self.proxies:
            try:
                r = requests.get(
                    test_url,
                    proxies={"http": proxy, "https": proxy},
                    timeout=timeout
                )
                if r.status_code == 200:
                    valid.append(proxy)
            except Exception:
                pass
        self.proxies = valid
        print(f"[代理池] 健康检测完成,可用代理: {len(valid)} 个")


def scrape_with_proxy(url, pool: ProxyPool, retries=3):
    for attempt in range(retries):
        proxy = pool.get()
        if proxy is None:
            raise RuntimeError("代理池已耗尽")
        try:
            resp = requests.get(
                url,
                headers=build_headers(),
                proxies=proxy,
                timeout=10
            )
            if resp.status_code == 429:
                pool.mark_failed(list(proxy.values()))
                time.sleep(2 ** attempt)
                continue
            resp.raise_for_status()
            return resp
        except requests.ProxyError:
            pool.mark_failed(list(proxy.values()))
        except requests.Timeout:
            time.sleep(1)
    return None


proxies = [
    "http://user:pass@proxy1.example.com:8080",
    "http://user:pass@proxy2.example.com:8080",
]
pool = ProxyPool(proxies)
pool.health_check()

关于代理来源:免费代理可用率通常低于 20%,生产环境建议用付费住宅代理,
可用率能稳定在 90% 以上。

这次跑了三天没出问题。直到换了目标平台,遇到了真正的硬骨头。


三、第三次被封:遇上了 Cloudflare

案发现场

新目标上了 Cloudflare 防护,访问直接触发"正在检查您的浏览器…"五秒盾,
或者直接 Error 1020: Access denied
代理换了没用,Header 换了没用,Session 也没用。

还有额外的拦截:未登录状态只能看前 3 页;同一账号多 IP 登录会触发风控。

原因分析

Cloudflare 的检测机制分三层:

第一层:JS Challenge(五秒盾)
返回一个 HTML 页面,让浏览器执行 JavaScript 计算答案后再发请求。普通 requests 根本不执行 JS,自然通不过。

第二层:TLS 指纹检测
每个 HTTP 客户端建立 TLS 连接时,会发送特定的握手特征(JA3 指纹),Python requests 和真实浏览器的 TLS 指纹完全不同,Cloudflare 一眼识别。

第三层:Turnstile 人机验证
通过行为分析和浏览器环境检测判断是否是真人,最难绕过。

三层对应三套解法:

解法一:cloudscraper 解决 JS Challenge

# pip install cloudscraper
import cloudscraper

scraper = cloudscraper.create_scraper(
    browser={'browser': 'chrome', 'platform': 'windows', 'mobile': False}
)
resp = scraper.get('https://cloudflare-protected-jobs.com')
print(resp.status_code)

解法二:curl_cffi 解决 TLS 指纹检测

# pip install curl_cffi
from curl_cffi import requests as curl_requests
import cloudscraper

# 第一步:用 cloudscraper 拿到 cf_clearance Cookie
cf_scraper = cloudscraper.create_scraper()
cf_scraper.get("https://example-jobs.com")
cookies = cf_scraper.cookies.get_dict()
ua = cf_scraper.headers.get('User-Agent')

# 第二步:用 curl_cffi + Cookie 发实际请求,TLS 指纹正确
resp = curl_requests.get(
    "https://example-jobs.com/search?q=Python工程师&page=1",
    impersonate="chrome122",
    cookies=cookies,
    headers={"User-Agent": ua}
)

解法三:nodriver 解决 Turnstile 验证

# pip install nodriver
import nodriver as uc
import asyncio

async def scrape_with_nodriver(url):
    browser = await uc.start(
        headless=False,
        browser_args=['--no-sandbox',
                      '--disable-blink-features=AutomationControlled']
    )
    page = await browser.get(url)
    await page.wait_for(selector='div.job-list', timeout=10)
    content = await page.get_content()
    await browser.stop()
    return content

content = asyncio.run(
    scrape_with_nodriver('https://example-jobs.com/search?q=Python工程师')
)

注意:undetected-chromedriver 已于 2025 年 2 月停止维护,
新项目不要再用,直接换 nodriver

账号池处理登录态问题:

ACCOUNTS = [
    {"cookie": "sessionid=abc123; _token=xxx", "ua": USER_AGENTS},
    {"cookie": "sessionid=def456; _token=yyy", "ua": USER_AGENTS},[1]
    # 建议准备 5~10 个账号轮换
]

def build_headers_with_account(account):
    return {
        "User-Agent": account["ua"],
        "Cookie": account["cookie"],
        "Referer": "https://example-jobs.com/",
        "Accept-Language": "zh-CN,zh;q=0.9",
    }

四、最终方案:日抓 10 万条的完整架构

把所有方案整合进一套异步并发框架,是跑出稳定数据量的关键。

完整生产级代码

import asyncio
import random
import time
import sqlite3
import pymysql
from curl_cffi.requests import AsyncSession
from bs4 import BeautifulSoup
from pybloom_live import ScalableBloomFilter

# ── 配置 ─────────────────────────────────────────────
CONCURRENCY = 50
REQUEST_DELAY = (0.5, 1.5)
MAX_RETRIES = 3

PROXY_LIST = [
    "http://user:pass@proxy1.example.com:8080",
    "http://user:pass@proxy2.example.com:8080",
]

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
]

# ── MySQL 存储 ────────────────────────────────────────
def get_db_conn():
    return pymysql.connect(
        host='127.0.0.1', port=3306,
        user='root', password='yourpassword',
        database='jobs_db',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )

def init_table():
    conn = get_db_conn()
    with conn.cursor() as cur:
        cur.execute('''
            CREATE TABLE IF NOT EXISTS jobs (
                id          BIGINT AUTO_INCREMENT PRIMARY KEY,
                job_id      VARCHAR(64) UNIQUE NOT NULL,
                title       VARCHAR(200),
                salary      VARCHAR(50),
                company     VARCHAR(200),
                city        VARCHAR(50),
                experience  VARCHAR(50),
                education   VARCHAR(50),
                tags        VARCHAR(500),
                published   VARCHAR(50),
                source_url  VARCHAR(500),
                created_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
        ''')
    conn.commit()
    conn.close()

def save_jobs(jobs: list):
    if not jobs:
        return
    conn = get_db_conn()
    sql = '''
        INSERT IGNORE INTO jobs
        (job_id, title, salary, company, city,
         experience, education, tags, published, source_url)
        VALUES
        (%(job_id)s, %(title)s, %(salary)s, %(company)s, %(city)s,
         %(experience)s, %(education)s, %(tags)s, %(published)s, %(source_url)s)
    '''
    with conn.cursor() as cur:
        cur.executemany(sql, jobs)
    conn.commit()
    conn.close()

# ── 去重过滤器 ────────────────────────────────────────
bloom = ScalableBloomFilter(mode=ScalableBloomFilter.SMALL_SET_GROWTH)

# ── 断点续爬 ──────────────────────────────────────────
def init_state_db():
    conn = sqlite3.connect('scraper_state.db')
    conn.execute('''CREATE TABLE IF NOT EXISTS done_urls
                    (url TEXT PRIMARY KEY, ts INTEGER)''')
    conn.commit()
    return conn

def is_done(conn, url):
    return conn.execute(
        'SELECT 1 FROM done_urls WHERE url=?', (url,)
    ).fetchone() is not None

def mark_done(conn, url):
    conn.execute('INSERT OR IGNORE INTO done_urls VALUES (?,?)',
                 (url, int(time.time())))
    conn.commit()

# ── 请求 ──────────────────────────────────────────────
async def fetch(session, url, proxy):
    headers = {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
        "Referer": "https://example-jobs.com/",
    }
    for attempt in range(MAX_RETRIES):
        try:
            resp = await session.get(
                url, headers=headers, proxy=proxy,
                impersonate="chrome122", timeout=15
            )
            if resp.status_code == 200:
                return resp.text
            if resp.status_code in (403, 429):
                proxy = random.choice(PROXY_LIST)
                await asyncio.sleep(2 ** attempt)
        except Exception:
            await asyncio.sleep(1)
    return None

# ── 解析 ──────────────────────────────────────────────
def parse_jobs(html, url):
    soup = BeautifulSoup(html, 'html.parser')
    jobs = []
    for card in soup.select('div.job-card'):
        job_id = card.get('data-jobid', '')
        if not job_id or job_id in bloom:
            continue
        bloom.add(job_id)
        jobs.append({
            'job_id':     job_id,
            'title':      card.select_one('.job-title').get_text(strip=True),
            'salary':     card.select_one('.salary').get_text(strip=True),
            'company':    card.select_one('.company-name').get_text(strip=True),
            'city':       card.select_one('.location').get_text(strip=True),
            'experience': card.select_one('.exp').get_text(strip=True),
            'education':  card.select_one('.edu').get_text(strip=True),
            'tags':       ','.join(t.get_text() for t in card.select('.tag')),
            'published':  card.select_one('.pub-date').get_text(strip=True),
            'source_url': url,
        })
    return jobs

# ── Worker ────────────────────────────────────────────
total_count = 0

async def worker(queue, session, state_conn, worker_id):
    global total_count
    while True:
        url = await queue.get()
        try:
            if is_done(state_conn, url):
                continue
            proxy = random.choice(PROXY_LIST)
            html = await fetch(session, url, proxy)
            if html:
                jobs = parse_jobs(html, url)
                save_jobs(jobs)
                mark_done(state_conn, url)
                total_count += len(jobs)
                print(f"[Worker-{worker_id}] {len(jobs)} 条 | "
                      f"累计: {total_count} | {url}")
            await asyncio.sleep(random.uniform(*REQUEST_DELAY))
        finally:
            queue.task_done()

# ── 主入口 ────────────────────────────────────────────
async def main():
    init_table()
    state_conn = init_state_db()
    queue = asyncio.Queue()

    CITIES = ['beijing', 'shanghai', 'shenzhen', 'hangzhou', 'chengdu']
    KEYWORDS = ['Python工程师', 'Java工程师', 'Go工程师',
                '前端工程师', '算法工程师', '数据工程师']

    urls = [
        f"https://example-jobs.com/search?city={city}&q={kw}&page={page}"
        for city in CITIES
        for kw in KEYWORDS
        for page in range(1, 101)
    ]
    # 5城市 × 6岗位 × 100页 = 3000个URL,每页约30条,理论总量 ≈ 9万条

    for url in urls:
        await queue.put(url)

    async with AsyncSession() as session:
        workers = [
            asyncio.create_task(worker(queue, session, state_conn, i))
            for i in range(CONCURRENCY)
        ]
        await queue.join()
        for w in workers:
            w.cancel()

    print(f"\n全部完成,共入库 {total_count} 条数据")

asyncio.run(main())

五、避坑清单

请求层

  • UA 池覆盖 Chrome、Firefox、Safari 主流版本,每次请求随机选取
  • 携带完整请求头:AcceptAccept-LanguageAccept-EncodingReferer
  • 请求间隔加随机抖动,不要用固定延迟
  • 使用 Session 保持 Cookie,模拟真实会话

IP 层

  • 代理池启动时做健康检测,剔除不可用 IP
  • 遇到 403/429 自动切换代理,不要在同一 IP 上反复重试
  • 生产环境用付费住宅代理,免费代理可用率不稳定
  • 单 IP 每小时请求量控制在目标网站阈值的 60% 以内

反爬层

  • 遇到 Cloudflare JS Challenge:用 cloudscraper
  • 遇到 TLS 指纹检测:换 curl_cffi,指定 impersonate 参数
  • 遇到 Turnstile 人机验证:用 nodriver 真实浏览器自动化
  • undetected-chromedriver 已停止维护,不要再用

架构层

  • asyncio 异步并发,比多线程资源消耗小得多
  • 断点续爬必须做,任务状态持久化到 SQLite 或 Redis
  • 数据去重用 BloomFilter,比 set 内存占用低 10 倍以上
  • 数据库写入用 executemany 批量插入,性能比逐条插入高 20 倍

合规层

  • 爬取前检查 robots.txt,尊重 Disallow 规则
  • 数据仅用于个人研究或合规业务,不做二次分发
  • 爬取频率不影响目标网站正常服务

写在最后

招聘数据是程序员最有动力去爬的一类数据——毕竟和自己的钱包直接挂钩。

用这套方案跑完之后,我做了一张各城市 Python 岗位薪资分布图,发现杭州
的中位数薪资已经超过北京,把这个结论发在群里,直接炸出来一堆人让我把数据共享出来。

回头看这三次被封,每次都是一次强制升级。第一次教会我基本伪装,第二次让我搞清楚 IP 管理,第三次把我逼着去研究 TLS 和浏览器指纹。真正让爬虫稳定跑起来的,不是某一个参数,而是把请求伪装、IP 管理、反爬对抗、并发架构这几个环节都做扎实,缺哪一块都会在某个地方翻车。

如果这篇文章帮你少踩了一个坑,转发给同样在爬数据的朋友,也许他能少走些弯路。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。