Spider实战系列-爬取鬼吹灯小说
第一次发表实战类型的爬虫文章,如果有那里不明白或者出现bug的可以找我私信,欢迎大家在下面评论,可以给出我更好的建议,欢迎大家指正.
网站链接放在这里了鬼吹灯
主要是以协程为主来爬取小说得章节内容,协程爬取不懂得小伙伴可以先关注我一手,后续会整理理论的知识放在专栏里
整体思路
- 得到鬼吹灯页面的源码
- 解析源码得到每一个章节的url
- 得到书名,这个书名通过切片得到
- 通过url得到一个页面的内容
- 使用并发执行多个任务下载
代码实现
导包
import asyncio
import os
from aiohttp import ClientSession
import requests
import aiofiles
from bs4 import BeautifulSoup
复制代码
得到页面源码的方法
参数是传入的url
返回出页面的源码
def get_page_source(url):
"""
获取页面源码的方法
:param url: 传入的url
:return: 返回的是页面的源码
"""
headers={
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78'
}
res = requests.get(url,headers=headers)
data = res.content.decode()
return data
复制代码
解析页面得到url
在这里,我使用了集合来存储每一个章节的url,使用xpath来得到章节url,我个人是比较喜欢使用xpath,在这里给出另一种写法,使用的是的beautifulSoup
在页面F12查看,我们找到的是div下的ul下的li下的a标签的属性href
写法一:使用xpath
def parse_page_source(html):
"""
对页面进行解析,得到我们每一个章节的url
:param html: 传入的页面源码
:return: 返回的是一个带有所有章节url的集合
"""
book_list=[]
tree = etree.HTML(html)
mulu_list = tree.xpath('//div[@class="mulu-list quanji"]')
for mulu in mulu_list:
# 抓取整个页面下章节的url
a = mulu.xpath('./ul/li/a/@href')
# 注意这里一定要在for循环里添加集合
book_list.append(a)
return book_list
复制代码
写法二:bs4
def parse_page_source(html):
"""
对页面进行解析,得到我们每一个章节的url
:param html: 传入的页面源码
:return: 返回的是一个带有所有章节url的集合
"""
book_list = []
soup = BeautifulSoup(html, 'html.parser')
a_list = soup.find_all('div', attrs={'class': 'mulu-list quanji'})
for a in a_list:
a_list = a.find_all('a')
for href in a_list:
chapter_url = href['href']
book_list.append(chapter_url)
return book_list
复制代码
得到和章节名
通过传入的章节url,进行切片以下面一个链接为例
https://www.51shucheng.net/daomu/guichuideng/jingjuegucheng/2464.html
我们按照/切分就有了['https:', '', 'www.51shucheng.net', 'daomu', 'guichuideng', 'jingjuegucheng', '2464.html'] 然后我们取倒数第二个元素就有了b_name
def get_book_name(chapter_url):
"""
得到名称,为了后续下载好分辨
:param chapter_url:
:return:
"""
book_chapter_name = chapter_url.split('/')[-2]
return book_chapter_name
复制代码
下载一个章节的内容
使用xpath来写
async def aio_download_one_content(chapter_url, single):
"""
下载一个章节内容
:param chapter_url: 传入得到的章节url
:param single: 使用async with single就是10个并发
:return:
"""
c_name = get_book_name(chapter_url)
for i in range(10):
try:
async with single:
async with ClientSession() as session:
async with session.get(chapter_url) as res:
# 得到章节内容的页面的源码
page_source = await res.content.read()
tree = etree.HTML(page_source)
# 章节名称
base_title = tree.xpath('//h1/text()')[0]
if(':'in base_title):
number = base_title.split(':')[0]
con = base_title.split(':')[1]
title = number+con
else:
title=base_title
# 章节内容
content = tree.xpath('//div[@class="neirong"]/p/text()')
chapter_content = '\n'.join(content)
if not os.path.exists(f'{book_name}/{c_name}'):
os.makedirs(f'{book_name}/{c_name}')
async with aiofiles.open(f'{book_name}/{c_name}/{title}.txt', mode="w",
encoding='utf-8') as f:
await f.write(chapter_content)
print(chapter_url, "下载完毕!")
return ""
except Exception as e:
print(e)
print(chapter_url, "下载失败!, 重新下载. ")
return chapter_url
复制代码
这段代码单独拿出来是因为有的章节名称是这样的<<第19章 : 考古队>>,这样的数据是不对的,放在文件里无法命名,这就导致了后续能写入文件的只有章节名没有:的内容,所以我对第一次筛选出的数据进行切片,如果遇到:就把前面和后面的数据切出来在组合,如果没遇到就让一个新的变量来接收base_title
# 章节名称
base_title = tree.xpath('//h1/text()')[0]
if(':'in base_title):
number = base_title.split(':')[0]
con = base_title.split(':')[1]
title = number+con
else:
title=base_title
复制代码
使用bs4来写
async def aio_download_one(chapter_url, signal):
"""
下载一个章节内容
:param chapter_url: 传入得到的章节url
:param single: 使用async with single就是10个并发
:return:
"""
c_name = get_book_name(chapter_url)
for c in range(10):
try:
async with signal:
async with aiohttp.ClientSession() as session:
async with session.get(chapter_url) as resp:
# 得到章节内容的页面的源码
page_source = await resp.text()
soup = BeautifulSoup(page_source, 'html.parser')
# 章节名称
base_title = soup.find('h1').text
if (':' in base_title):
number = base_title.split(':')[0]
con = base_title.split(':')[1]
title = number + con
else:
title = base_title
# 章节内容
p_content = soup.find('div', attrs={'class': 'neirong'}).find_all('p')
content = [p.text + '\n' for p in p_content]
chapter_content = '\n'.join(content)
if not os.path.exists(f'{book_name}/{c_name}'):
os.makedirs(f'{book_name}/{c_name}')
async with aiofiles.open(f'{book_name}/{c_name}/{title}.txt', mode="w",
encoding='utf-8') as f:
await f.write(chapter_content)
print(chapter_url, "下载完毕!")
return ""
except Exception as e:
print(e)
print(chapter_url, "下载失败!, 重新下载. ")
return chapter_url
复制代码
协程下载
async def aio_download(url_list):
# 创建一个任务列表
tasks = []
# 设置最多10个任务并行运作
semaphore = asyncio.Semaphore(10)
for h in url_list:
tasks.append(asyncio.create_task(aio_download_one_content(h, semaphore)))
await asyncio.wait(tasks)
复制代码
主函数运行
主函数运行就没什么可说的了,这里注意一点就是最后不要loop.close(),这样的话会导致你还没有爬取完数据,loop.close()就会关闭,情况如下,还剩一点就爬完了,结果报错了
if __name__ == '__main__':
url = 'https://www.51shucheng.net/daomu/guichuideng'
book_name = '鬼吹灯'
if not os.path.exists(book_name):
os.makedirs(book_name)
source = get_page_source(url)
href_list = parse_page_source(source)
loop = asyncio.get_event_loop()
loop.run_until_complete(aio_download(href_list))
复制代码
完成效果图
我就不一一截图了
总结
为什么我在这里比对了xpath和bs4两种代码,小伙伴可以仔细看一下,在xpath中,我想拿到数据,找到它,大量的使用了//这种,这样的话就会从源码内全局检索,这就导致了我想爬取文章内容会很慢,有些时候还会超时导致报错.所以我们使用xpath的时候,想让他的速度提高,最好有一个指定的点 到了再//
可以理解为下面这种情况
def getList():
url = 'https://www.xiurenba.cc/XiuRen/'
response = requests.get(url, headers=headers)
data = response.c
ontent.decode()
# print(data)
tree = etree.HTML(data)
li_list = tree.xpath('//ul[@class="update_area_lists cl"]/li')
return li_list
复制代码
def get_single_url():
li_list = getList()
for li in li_list:
single_url = 'https://www.xiurenba.cc' + li.xpath('./a/@href')[0]
复制代码
还有就是遇到了特殊符号要把它干掉,或者替换掉,这样就可以正常爬取数据
如果有小伙伴想要直接拿取源码的话,可以顺着代码实现一步步粘贴过去
- 点赞
- 收藏
- 关注作者
评论(0)