locust实战

举报
建帅小伙儿 发表于 2022/09/25 01:53:00 2022/09/25
【摘要】 一.依赖接口 panglu_test.py文件 # coding=utf-8""" 作者:gaojs 功能: 新增功能: 日期:2022/4/8 18:03"""import jsonimport os.pathimport pprintimport time import requests# 定义...

一.依赖接口

panglu_test.py文件
  

   
  1. # coding=utf-8
  2. """
  3. 作者:gaojs
  4. 功能:
  5. 新增功能:
  6. 日期:2022/4/8 18:03
  7. """
  8. import json
  9. import os.path
  10. import pprint
  11. import time
  12. import requests
  13. # 定义xml转json的函数
  14. import xmltodict as xmltodict
  15. # 强制去掉控制台InsecureRequestWarning
  16. import urllib3
  17. urllib3.disable_warnings()
  18. class Stress:
  19. """
  20. 旁路认证类
  21. """
  22. def __init__(self):
  23. self.headers = {
  24. 'Content-Type': 'text/xml',
  25. 'Connection': 'close',
  26. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36'
  27. }
  28. self.session = requests.session()
  29. self.session.keep_alive = False
  30. self.proxies = {"http": None, "https": None}
  31. def xmltojson(self, xml):
  32. """
  33. xml转成json
  34. :param xml:
  35. :return:
  36. """
  37. xml_data = xmltodict.parse(xml)
  38. json_data = json.dumps(xml_data, indent=1)
  39. return json_data
  40. def post_random(self, vsiteIp, appId):
  41. """
  42. 获取十位随机数方法
  43. :return:
  44. """
  45. url = 'https://%s/auth/getRandom' % vsiteIp
  46. data = '<?xml version="1.0" encoding="UTF-8"?>' \
  47. '<message>' \
  48. '<head>' \
  49. '<version>1.0</version>' \
  50. '<serviceType>OriginalService</serviceType>' \
  51. '</head>'\
  52. '<body>' \
  53. f'<appId>{appId}</appId>' \
  54. '</body>' \
  55. '</message>'
  56. try:
  57. rsp = self.session.post(url, data=data, headers=self.headers, verify=False, proxies=self.proxies)
  58. # print(rsp.text)
  59. json_data = self.xmltojson(rsp.text)
  60. json_data = json.loads(json_data)
  61. random_value = json_data['message']['body']['original']
  62. except requests.exceptions.ConnectionError:
  63. rsp.status_code = "Connection refused"
  64. # print(json_data, type(json_data))
  65. # print(random_value)
  66. return random_value
  67. def certificate_list(self):
  68. """
  69. 获取证书列表
  70. :return:
  71. """
  72. url = r'https://127.0.0.1:63451/NSSkfGetCertsListInfo?DllFilePath=D:\旁路认证\GM3000\mtoken_GM.dll'
  73. rsp = self.session.get(url, verify=False, proxies=self.proxies)
  74. json_data = json.loads(rsp.text)
  75. # print(json_data[0])
  76. def skfAttach(self, PlainText, CertIndex0, ukeyPIN, sm3HashNum):
  77. """
  78. skfAttach签名
  79. :return:
  80. """
  81. url = f'https://127.0.0.1:63451/NSSkfAttachedSign?PlainText={PlainText}&CertIndex={CertIndex0}&UsbKeyPin={ukeyPIN}&DigestArithmetic={sm3HashNum}'
  82. try:
  83. rsp = self.session.get(url, headers=self.headers, verify=False, proxies=self.proxies)
  84. json_data = json.loads(rsp.text)
  85. signature_value = json_data[0]['signedData']
  86. # print(signature_value)
  87. # print(signature_value)
  88. # 接口返回的述职和界面上的述职不太一样了需要吧‘-’和‘_’替换一下才行
  89. s1 = signature_value.replace('-', '+')
  90. s2 = s1.replace('_', '/')
  91. return s2
  92. except requests.exceptions.ConnectionError:
  93. rsp.status_code = "Connection refused"
  94. def skfDetach(self, PlainText, CertIndex, ukeyPIN, sm3HashNum):
  95. """
  96. skfdetach签名
  97. PlainText = 十位随机数
  98. CertIndex = 1
  99. sm3HashNum = 1.2.156.10197.1.401
  100. ukeyPIN = 12345678
  101. :return:
  102. """
  103. url = f'https://127.0.0.1:63451/NSSkfDetachedSign?PlainText={PlainText}&CertIndex={CertIndex}&UsbKeyPin={ukeyPIN}&DigestArithmetic={sm3HashNum}'
  104. try:
  105. rsp = self.session.get(url, headers=self.headers, verify=False, proxies=self.proxies)
  106. time.sleep(3)
  107. json_data = json.loads(rsp.text)
  108. signature_value = json_data[0]['signedData']
  109. # print(signature_value)
  110. # 接口返回的述职和界面上的述职不太一样了需要吧‘-’和‘_’替换一下才行
  111. s1 = signature_value.replace('-', '+')
  112. s2 = s1.replace('_', '/')
  113. return s2
  114. except requests.exceptions.ConnectionError:
  115. rsp.status_code = "Connection refused"
  116. # print(signature_value)
  117. def detach_auth(self, vsiteIp, appId):
  118. """
  119. 带着生成的数字签名去认证
  120. :return:
  121. """
  122. random_value = self.post_random(vsiteIp, appId)
  123. print(random_value)
  124. # 获取证书列表
  125. self.certificate_list()
  126. # detach数字签名
  127. detach_data = self.skfDetach(PlainText=random_value, CertIndex='0', ukeyPIN='12345678', sm3HashNum='1.2.156.10197.1.401')
  128. url = 'https://%s/auth/authUser' % vsiteIp
  129. data = f'''<?xml version="1.0" encoding="utf-8"?>
  130. <message>
  131. <head>
  132. <version>1.0</version>
  133. <serviceType>authenService</serviceType>
  134. </head>
  135. <body>
  136. <appId>T1</appId>
  137. <authen>
  138. <authCredential authMode="password">
  139. <uname>test</uname>
  140. <pwd>test</pwd>
  141. </authCredential>
  142. <authCredential authMode="cert">
  143. <detach>{detach_data}</detach>
  144. <original>{random_value}</original>
  145. </authCredential>
  146. </authen>
  147. <attributes attributeType="portion">
  148. <attr name="X509Certificate.SubjectDN"></attr>
  149. </attributes>
  150. </body>
  151. </message>
  152. '''
  153. rsp = self.session.post(url, headers=self.headers, data=data, verify=False, proxies=self.proxies)
  154. json_data = self.xmltojson(rsp.text)
  155. json_body = json.loads(json_data)['message']['body']['attributes']['attr']['#text']
  156. print(json_body)
  157. return json_data
  158. # 测试接口代码
  159. test = Stress()
  160. # rsp = test.detach_auth(vsiteIp='192.168.120.209', appId='T1', CertIndex='0', ukeyPIN='12345678', sm3HashNum='1.2.156.10197.1.401')
  161. # print(rsp)
  162. for i in range(100000):
  163. rsp = test.detach_auth(vsiteIp='192.168.120.209', appId='T1')
  164. text = json.loads(rsp)['message']['body']['attributes']['attr']['#text']
  165. print(f'================================================第 {i+1} 次身份认证成功!===============================================')
  166. with open('log.txt', mode='a', encoding='utf-8') as f:
  167. f.write(f'===========================================第 {i+1} 次身份认证成功!============================================\n')
  168. f.write(text + '\n')
  169. print(text)

二、locust脚本

stress_test.py文件
  

   
  1. # coding=utf-8
  2. """
  3. 作者:gaojs
  4. 功能:
  5. 新增功能:
  6. 日期:2022/4/8 18:25
  7. """
  8. import json
  9. import pprint
  10. import time
  11. from locust import HttpUser, between, task, TaskSet
  12. from panglu_test import Stress
  13. import os
  14. import logging
  15. # 强制去掉控制台InsecureRequestWarning
  16. import urllib3
  17. urllib3.disable_warnings()
  18. class TaskTest(TaskSet, Stress):
  19. # 执行并发前置动作,比如清理当前所有session
  20. def __init__(self, parent: "User"):
  21. super().__init__(parent)
  22. self.signature = None
  23. self.random_value = None
  24. def on_start(self):
  25. """
  26. description:每个用户执行压测之前都会获取随机数和数字签名
  27. :return:
  28. """
  29. # 每个用户执行压测之前都会获取随机数和数字签名
  30. print('====================清理log结束,压测开始, 获取随机数和数字签名!!!========================')
  31. self.random_value = Stress().post_random('192.168.120.209', 'T1')
  32. # 获取证书列表
  33. Stress().certificate_list()
  34. # detach会根据证书列表下标去匹配证书,下发数字签名,并返回签名结果
  35. self.signature = Stress().skfDetach(PlainText=self.random_value, CertIndex='0', ukeyPIN='12345678', sm3HashNum='1.2.156.10197.1.401')
  36. # return self.random_value, self.signature
  37. # 压测任务,也可以是@task(10)啥的,这个数字是代表权重,数值越大,执行的频率就越高
  38. @task
  39. def stress(self):
  40. """
  41. 旁路认证身份认证请求
  42. :return:
  43. """
  44. url = '/auth/authUser'
  45. data = f'''<?xml version="1.0" encoding="utf-8"?>
  46. <message>
  47. <head>
  48. <version>1.0</version>
  49. <serviceType>authenService</serviceType>
  50. </head>
  51. <body>
  52. <appId>T1</appId>
  53. <authen>
  54. <authCredential authMode="password">
  55. <uname>t</uname>
  56. <pwd>t</pwd>
  57. </authCredential>
  58. <authCredential authMode="cert">
  59. <detach>{self.signature}</detach>
  60. <original>{self.random_value}</original>
  61. </authCredential>
  62. </authen>
  63. <attributes attributeType="portion">
  64. <attr name="X509Certificate.SubjectDN"></attr>
  65. </attributes>
  66. </body>
  67. </message>
  68. '''
  69. headers = {"Content-Type": "text/xml;charset=UTF-8"}
  70. time.sleep(1)
  71. rsp = self.client.request(method='POST',
  72. url=url,
  73. data=data,
  74. catch_response=True,
  75. headers=headers,
  76. name='旁路认证',
  77. verify=False,
  78. allow_redirects=False)
  79. time.sleep(1)
  80. # 添加断言,是否成功认证
  81. rsp_json = self.xmltojson(rsp.text)
  82. rsp_str = json.loads(rsp_json)
  83. pprint.pprint(rsp_str)
  84. # msg_status = rsp_str['message']['body']['authResultSet']['authResult'][1]['@success']
  85. msg = rsp_str['message']['body']['attributes']['attr']['#text']
  86. # 认证接口返回200,但是业务未必是成功的,所以我们要做断言,如果认证请求返回的结果中messageState为true,则认为认证通过
  87. print(msg)
  88. # print(rsp_json)
  89. if not msg:
  90. print('认证失败,请重试!')
  91. print(rsp_str)
  92. # url = 'http://httpbin.org/get'
  93. # self.client.request(method='GET', url=url)
  94. # 执行并发测试后执行的动作,比如保存log等操作,查看报告http://localhost:8089/
  95. def on_stop(self):
  96. pass
  97. class UserBehavior(HttpUser):
  98. host = 'https://192.168.120.209'
  99. # 每次请求停顿时间
  100. wait_time = between(5, 10)
  101. tasks = [TaskTest]
  102. if __name__ == "__main__":
  103. os.system("locust -f stress_test.py --host=https://192.168.120.209 --web-host=127.0.0.1")

文章来源: blog.csdn.net,作者:懿曲折扇情,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_41332844/article/details/126837371

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。