【IAM身份中心】通过脚本导出一个IAM身份中心的授权报告
【摘要】 0. 引言您在华为云上,可以通过IAM身份中心服务,管理多账号下用户身份与权限管理,详细可以参考IAM身份中心的产品手册。在日常运维过程中,您也可以在控制台,用户详情页面,查看为用户分配的账号与权限集。那么如果您的用户数量较大,想要实现批量的结果查询,这里为您提供了一个简单的代码样例,可以实现整个IAM身份中心实例的授权详情导出。获得的报告格式为csv文件,报告格式如下UserName:用户...
0. 引言
您在华为云上,可以通过IAM身份中心服务,管理多账号下用户身份与权限管理,详细可以参考IAM身份中心的产品手册。
在日常运维过程中,您也可以在控制台,用户详情页面,查看为用户分配的账号与权限集。

那么如果您的用户数量较大,想要实现批量的结果查询,这里为您提供了一个简单的代码样例,可以实现整个IAM身份中心实例的授权详情导出。
获得的报告格式为csv文件,报告格式如下
- UserName:用户名
- Account:该用户有权访问的账号名称
- PermissionSet:该用户访问该账号所拥有的权限集
- AssignFromGroup:如果该用户的权限是集成与用户组,则此处会显示用户组名称,如果为空,则代表该权限是直接授权给用户
| UserName | Account | PermissionSet | AssignFromGroup |
| zhangsan | Account_1 | AdministratorAccess | Admingroup |
| zhangsan | Account_2 | ViewOnlyAccess |
1. 前提条件
- 您已开通IAM身份中心,并拥有IAM身份中心服务readonly权限的秘钥。
- 本地已搭建Python运行环境;
- 参考Python_签名SDK与demo_AK/SK签名认证操作指导_开发指南_API签名指南-华为云,安装requests并下载apig的SDK与Demo;
2. 操作步骤
2.1 准备代码
- 解压APIG SDK与DEMO的压缩包,我们将获得结构如下的文件包,我们需要将本文提供的代码替换到main.py中。
|
名称 |
说明 |
|---|---|
|
apig_sdk\__init__.py |
SDK代码 |
|
apig_sdk\signer.py |
|
|
main.py |
替换这里的代码 |
|
licenses\license-requests |
第三方库license文件 |
您需要将代码中设置正确的参数:
REGIONID :开通IdC的region id,如cn-east-4;
HUAWEICLOUD_AK :替换为对应的ak
HUAWEICLOUD_SK : 替换为对应的sk
HUAWEICLOUD_SEC_TOKEN : 如果提供的是临时凭据,请替换为对应的Security token,否则不填即可
FILENAME_PREFIX :输出报告的文件前缀,可自行修改。
import requests, json, csv, os
from datetime import datetime
from apig_sdk import signer
REGIONID = "YOUR_REGION"
HUAWEICLOUD_AK = "YOUR_AK"
HUAWEICLOUD_SK = "YOUR_SK"
HUAWEICLOUD_SEC_TOKEN = ""
FILENAME_PREFIX="HW_IDC_REPORT"
def parse_content(content:str):
"""
Parse the returned result in JSON format.
param:
content: response content from request
return:
JSON parsing result of content, in list format.
"""
if not content:
print("Empty content provided to parse_content.")
return None
try:
parsed_data = json.loads(content)
if not isinstance(parsed_data, dict):
print("Parsed JSON data is not a list.")
return None
return parsed_data
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
except Exception as e:
print(f"Unexpected error during JSON parsing: {e}")
return None
def post_request_return_parsed_data(url:str):
"""
send request url to huawei cloud and return parsed data.
param:
url: request url
return:
JSON parsing result of content, in list format.
"""
sig = signer.Signer()
sig.Key = HUAWEICLOUD_AK
sig.Secret = HUAWEICLOUD_SK
method = "GET"
headers = {"content-type": "application/json"}
if HUAWEICLOUD_SEC_TOKEN:
headers["X-Security-Token"] = HUAWEICLOUD_SEC_TOKEN
body = ""
# sign
r = signer.HttpRequest(method, url, headers, body)
sig.Sign(r)
resp = requests.request(method, url, headers=r.headers, data=body)
return parse_content(resp.content)
def get_instanceId_identityStoreId():
"""
reference url:https://support.huaweicloud.com/api-identitycenter/ListInstances.html.
"""
url = "https://identitycenter.myhuaweicloud.com/v1/instances"
response = post_request_return_parsed_data(url)
result = response["instances"]
instanceId = result[0]["instance_id"]
identityStoreId = result[0]["identity_store_id"]
return instanceId, identityStoreId
def get_user_list(identity_store_id:str):
"""
reference url:https://support.huaweicloud.com/api-identitycenter/ListUsers.html.
"""
url = f"https://identitystore.{REGIONID}.myhuaweicloud.com/v1/identity-stores/{identity_store_id}/users"
response = post_request_return_parsed_data(url)
result = response["users"]
return result
def get_group_list(identity_store_id:str):
"""
reference url:https://support.huaweicloud.com/api-identitycenter/ListGroups.html.
"""
url = f"https://identitystore.{REGIONID}.myhuaweicloud.com/v1/identity-stores/{identity_store_id}/groups"
response = post_request_return_parsed_data(url)
result = response["groups"]
return result
def get_account_list(identity_store_id:str):
"""
reference url:https://support.huaweicloud.com/api-organizations/ListAccounts.html.
"""
url = f"https://organizations.myhuaweicloud.com/v1/organizations/accounts"
response = post_request_return_parsed_data(url)
result = response["accounts"]
return result
def get_permissionSet_list(instanceId:str):
"""
reference url:https://support.huaweicloud.com/api-identitycenter/ListPermissionSets.html.
"""
url = f"https://identitycenter.myhuaweicloud.com/v1/instances/{instanceId}/permission-sets"
response = post_request_return_parsed_data(url)
result = response["permission_sets"]
return result
def get_assign_account_by_user(instanceId:str, userId:str):
"""
reference url:https://support.huaweicloud.com/api-identitycenter/ListAccountAssignmentsForPrincipal.html.
"""
url = f"https://identitycenter.myhuaweicloud.com/v1/instances/{instanceId}/account-assignments-for-principals?principal_id={userId}&principal_type=USER"
response = post_request_return_parsed_data(url)
result = response["account_assignments"]
return result
def get_attribute_from_list(listInfo:list, orgName:str, orgValue:str, tarName:str):
result = next((item for item in listInfo if item.get(orgName) == orgValue), None)
if result:
return result[tarName]
else:
return None
def write_to_csv(filename:str, data:list, fieldnames:list):
"""
Append data to a CSV file. If the file already exists and has a table header, the table header is not added.
Args:
filename: CSV file name
data: List of data to append
fieldnames: CSV table header
"""
file_exists = os.path.isfile(filename)
mode = 'a' if file_exists else 'w'
with open(filename, mode, newline='', encoding='utf-8-sig') as csvfile:
if fieldnames is None:
fieldnames = list(data[0].keys())
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerows(data)
def put_assign_data_to_csv(assign_data:dict, username:str, group_list:list, account_list:list, permission_set_list:list, filename:str):
items = list()
for assignment in assign_data:
item = dict()
item["UserName"] = username
item["Account"] = get_attribute_from_list(account_list, "id", assignment["account_id"], tarName="name")
item["PermissionSet"] = get_attribute_from_list(permission_set_list, "permission_set_id", assignment["permission_set_id"], tarName="name")
if assignment["principal_type"] == "GROUP":
item["AssignFromGroup"] = get_attribute_from_list(group_list, "group_id", assignment["principal_id"], tarName="display_name")
else:
item["AssignFromGroup"] = ""
items.append(item)
fieldnames = ["UserName", "Account", "PermissionSet", "AssignFromGroup"]
write_to_csv(filename, items, fieldnames)
def generate_report_filename():
time_stamp = datetime.now().strftime('%Y%m%d%H%M%S')
return f"{FILENAME_PREFIX}_{time_stamp}.csv"
if __name__ == '__main__':
instanceId, identityStoreId = get_instanceId_identityStoreId()
print(f"your IdC Instance Id is {instanceId}, and IdentityStore Id is {identityStoreId}")
user_list = get_user_list(identityStoreId)
print(f"Total users number you have: {len(user_list)}")
group_list = get_group_list(identityStoreId)
print(f"Total groups number you have: {len(group_list)}")
account_list = get_account_list(instanceId)
print(f"Total accounts number you have: {len(account_list)}")
permissionSet_list = get_permissionSet_list(instanceId)
print(f"Total permissionSet number you have: {len(permissionSet_list)}")
fileName = generate_report_filename()
for user in user_list:
print((f"Check the user:{user['user_name']} and write into the result file"))
assign_data = get_assign_account_by_user(instanceId, user["user_id"])
put_assign_data_to_csv(assign_data, user["user_name"], group_list, account_list, permissionSet_list, fileName)
print(f"Done, please check your file {fileName}")
2.2 运行代码获取报告
(1)运行该main.py,即可获取IdC授权报告
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)