一个站点不够学?那就在用Python增加一个采集目标,一派话题广场+某金融论坛话题广场爬虫
本次的目标站点原计划是一个比较简单的站点,后来发现有点太简单了,就额外增加了一个案例,学一个赠一个,本篇博客核心用到的技术依旧是队列 queue 技术。
目标站点【一派话题广场】分析
本篇博客的第一个采集目标站点是:https://sspai.com/matrix/pods,少数派网站的一个子级栏目。
目标数据所在界面如下图所示:
通过开发者工具,不断下拉加载页面,得到的接口请求规则如下:
https://sspai.com/api/v1/bullet/search/page/get?type=0&limit=10&offset=0&created_at=0
https://sspai.com/api/v1/bullet/search/page/get?type=0&limit=10&offset=10&created_at=0
https://sspai.com/api/v1/bullet/search/page/get?type=0&limit=10&offset=20&created_at=0
https://sspai.com/api/v1/bullet/search/page/get?type=0&limit=10&offset=30&created_at=0
其中参数除 offset
变化外,其余无变化,其中 limit
参数应该为每个数据量,基于此逻辑,请求接口可以通过代码进行批量生成,实测过程发现数据量也不大,只有 6 页。
下述代码采用了后进先出队列 LifoQueue
,没有特殊原因,单纯给大家展示一下用法。
# 初始化一个队列
q = LifoQueue(maxsize=0)
# 批量生成请求地址链接
for page in range(1, 7):
# https://sspai.com/api/v1/bullet/search/page/get?type=0&limit=10&offset=0&created_at=0
q.put('https://sspai.com/api/v1/bullet/search/page/get?type=0&limit=10&offset={}&created_at=0'.format((page-1)*10))
请求地址批量生成完毕之后,可以开始获取接口返回的数据了,具体代码如下,核心关注 main
部分内容。
# 请求头函数,可以参考之前的系列文章
def get_headers():
uas = [
"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
"Mozilla/5.0 (compatible; Baiduspider-render/2.0; +http://www.baidu.com/search/spider.html)"
]
ua = random.choice(uas)
headers = {
"user-agent": ua
}
return headers
# 储存数据
def save(text):
# 文件名使用时间戳命名
with open(f'{time.time()}.json', 'a+', encoding='utf-8') as f:
f.write(text)
print(text, "--- 保存成功")
if __name__ == "__main__":
# 判断队列是否为空,空则停止循环
while q.qsize() > 0:
# 获取一个 URL
url = q.get()
# 通知任务已经完成
q.task_done()
# 获取相应数据
res = requests.get(url=url, headers=get_headers(), timeout=10)
# 调用保存函数
save(res.text)
q.join()
print("所有任务都已完成")
因为上述案例实在太简单,连多线程都不用,所以我基于【话题广场】关键字,查询是否还有其它可用于 学习目的 相关站点,结果还真被我发现一个。
话题广场 - 集思录
集思录,一个以数据为本的投资社区,https://www.jisilu.cn/topic/。
每一个话题下面,都有很多问题,完美符合生产者消费者模型。
编码逻辑分析
本案例中生产函数用于产生列表页,消费函数用于抓取内页 标题
,作者
,链接
。
由于在列表页之前,还存在一个层级-【热门话题】,所以需要提前准备好待抓取队列。
# 热门话题列表页待抓取链接
hot_subjects = Queue(maxsize=0)
for i in range(1, 11):
url = f'https://www.jisilu.cn/topic/square/id-hot__feature_id-__page-{i}'
hot_subjects.put(url)
接下来生产函数用于产生列表页数据,到提前初始化好的 q_data_ids
队列中,其中 get_headers
函数,参考前文即可。
def get_headers():
uas = [
"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
"Mozilla/5.0 (compatible; Baiduspider-render/2.0; +http://www.baidu.com/search/spider.html)"
]
ua = random.choice(uas)
headers = {
"user-agent": ua
}
return headers
# 初始化一个队列
q_data_ids = Queue(maxsize=0)
# 生产函数,用于产生话题列表页 URL
def producer():
while hot_subjects.qsize() > 0:
# 取得一个分类页请求地址
list_url = hot_subjects.get()
hot_subjects.task_done()
print("正在解析:", list_url)
# 获取话题列表页相应数据
res = requests.get(list_url, headers=get_headers(), timeout=3)
# 解析,获取话题详情页进入的关键参数 `data-id`,代码后有对于该关键字的说明
element = etree.HTML(res.text)
data_ids = element.xpath('//a[@class="aw-topic-name"]/@data-id')
for data_id in data_ids:
q_data_ids.put(data_id)
上述代码最重要的是捕获 data-id
,即热门话题的 ID 队列,该 ID 值会用在消费者函数中,用于请求详情页数据。
接下来就是消费者函数的实现,该逻辑主要用于抓取下图所示数据。
以上数据在测试过程中发现如下接口格式,其中 data_id
在生产者中产生,start_page
由循环迭代生成。
https://www.jisilu.cn/question/ajax/discuss/sort_type-new__topic_id-{data_id}__page-{start_page}
上述地址即为最后目标数据所在地址,我们需要拼凑出标准地址,然后再对 start_page
页码,进行迭代,遍历获取全部数据。
学习时请注意下述代码注释,技术层级难度不大,重点为实现逻辑。
# 消费者函数
def consumer():
# 死循环读取 data_id 值,用于拼凑话题详情页数据
while True:
# 取一个分类ID
data_id = q_data_ids.get()
q_data_ids.task_done()
if data_id is None:
break
# start_page 初始值设置为 1,即从第一个开始读取数据
start_page = 1
# URL 拼接
url = f'https://www.jisilu.cn/question/ajax/discuss/sort_type-new__topic_id-{data_id}__page-{start_page}'
res = requests.get(url=url, headers=get_headers(), timeout=5)
text = res.text
# 通过判断 text 是否为空,确定是否继续解析数据
while len(text) > 0:
url = f'https://www.jisilu.cn/question/ajax/discuss/sort_type-new__topic_id-{data_id}__page-{start_page}'
res = requests.get(url=url, headers=get_headers(), timeout=5)
# print(res.url)
text = res.text
start_page += 1
# 如果 text 不为空,则解析数据,并存储数据
if len(text)>0:
element = etree.HTML(res.text)
titles = element.xpath('//h4/a/text()')
urls = element.xpath('//h4/a/@href')
names = element.xpath('//a[@class="aw-user-name"]/text()')
data = zip(titles,names,urls)
save_list = [f"{item[0]},{item[1]},{item[2]}\n" for item in data]
long_str = "".join(save_list)
with open("./data.csv","a+",encoding="utf-8") as f:
f.write(long_str)
该程序实现的步骤与逻辑相对会绕一些,故通过下图你可以再复盘进行理解,按照步骤 1,2,3 进行学习。
函数运行可是采用多线程实现,具体如下:
# 开启2个生产者线程
for p_in in range(1, 3):
p = threading.Thread(target=producer)
p.start()
# 开启2个消费者线程
for p_in in range(1, 2):
p = threading.Thread(target=consumer)
p.start()
- 点赞
- 收藏
- 关注作者
评论(0)