selenium对接代理与seleniumwire访问开发者工具NetWork
之前我在《使用MitmProxy离线缓存360度全景网页》一文中演示了如何搭建python代理服务器MitmProxy。
但之前是纯手工访问网页缓存数据,如果我们希望能够自动访问网页并对接代理下载数据,可以通过selenium控制游览器实现自动访问。
对接selenium的代理服务器,有种用法是使用 browsermobproxy,它基于Java开发,需要在https://chromedevtools.github.io/devtools-protocol/tot/Network/下载对应文件。
参考:https://blog.csdn.net/u010741112/article/details/118674293
不过个人在研究后发现它只是一个基于Java开发的代理服务器,对编码的处理不够准确经常出现乱码,而且难以还原。
解决他的乱码问题,得添加Java代码的拦截器设置编码,例如设置文本都以GBK编码解码:
proxy = server.create_proxy()
# 根据实际网站设置
proxy.response_interceptor('''
if (contents.isText() ) {
response.headers().set("Content-Type", "text/json;charset=GBK");
}
''')
整体而言,个人感觉很难用。远远不如selenium直接对接MitmProxy代理用的方便。
selenium使用代理服务器的代码示例:
from selenium import webdriver
option = webdriver.ChromeOptions()
option.add_argument(f'--proxy-server=127.0.0.1:8080')
browser = webdriver.Chrome(options=option)
browser.get('https://www.baidu.com/')
这样只有我们就通过MitmProxy代理服务器获取所有经过selenium控制的游览器访问的数据,这样我们同时还实现自动化控制和数据获取之间的解耦,mitmdump加载的脚本专门复杂拦截数据并处理,selenium代码专门负责自动化控制。
而今天我要介绍的是seleniumwire这个库,关于这个库的完整用法可以参考:https://pypi.org/project/selenium-wire/
安装:
pip install selenium-wire
注意:Linux和Mac系统必须额外安装openssl用于解码https的数据。
CentOS:
yum install openssl
Mac:
brew install openssl
这个库基本上可以使用被控制的游览器访问留下的历史记录,可随意的下载,但是默认仅保存原始的字节数据,若遇到压缩数据需要自行解压。
下面我们以网易财经的行情中心数据演示一下,完整代码如下:
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from seleniumwire import webdriver
import json
import gzip
import time
import pandas as pd
browser = webdriver.Chrome()
browser.get('http://quotes.money.163.com/old/#query=dy019000&DataType=HS_RANK&sort=PERCENT&order=desc&count=24&page=0')
wait = WebDriverWait(browser, 30)
# 等待表格组件加载完毕
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "table.ID_table")))
def fetch_data():
for request in reversed(browser.requests):
if not request.url.startswith("http://quotes.money.163.com/hs/service/diyrank.php"):
continue
res = request.response
if not res or res.status_code != 200:
continue
if res.headers["content-encoding"] == "gzip":
# gzip解压
res.body = gzip.decompress(res.body)
return json.loads(res.body.decode('u8'))
def wait_loading():
"等待 等待组件的消失"
for _ in range(150):
if not browser.find_elements_by_css_selector("div.loading-cover"):
break
time.sleep(0.2)
data = []
for i in range(100):
wait_loading() # 等待加载完毕
page = fetch_data() # 从历史游览记录中获取数据
if page:
print(i, len(page['list']), page['list'][0]['CODE']) # 检测数据
data.extend(page['list']) # 插入数据
a_tag = browser.find_element_by_css_selector(
"div.ID_pages a:nth-last-of-type(1)")
del browser.requests # 清空游览器缓存
if a_tag.text != "下一页":
break
a_tag.click()
df = pd.DataFrame(data)
df
1048 rows × 25 columns
重点代码解释:
if res.headers["content-encoding"] == "gzip":
res.body = gzip.decompress(res.body)
这段代码实现对gzip压缩的网络数据进行自动解压,若存在其他类型的压缩还需要编写针对性代码,通用的解压缩代码:
import gzip
import zlib
import brotli
content_encoding = res.headers["content-encoding"]
if content_encoding == "gzip":
res.body = gzip.decompress(res.body)
elif content_encoding == "deflate":
res.body = zlib.decompress(res.body)
elif content_encoding == "br":
res.body = brotli.decompress(res.body)
wait_loading
方法用于检测数据是否已经加载完毕,原理时每隔0.2秒查看一下loading组件是否还存在,不存在则说明加载数据的过程已经结束。
browser.requests
缓存了游览器在访问过程获取的所有数据,就好像游览器开发者工具的Network,reversed(browser.requests)的目的是倒着查看数据,即优先查看最新获取的数据。通过这次方式,即使不清理历史缓存,也能较为正确获取数据。
del browser.requests
的作用是清理缓存,目的是在下一次点击访问前先清理历史缓存,从而使数据获取的速度变得更快。
文章来源: xxmdmst.blog.csdn.net,作者:小小明-代码实体,版权归原作者所有,如需转载,请联系作者。
原文链接:xxmdmst.blog.csdn.net/article/details/126004046
- 点赞
- 收藏
- 关注作者
评论(0)