【IAM身份中心】如何将用户与组从CSV批量导入到华为云IAM身份中心

举报
黄田雨 发表于 2025/09/26 10:44:40 2025/09/26
【摘要】 无论您是使用SAML2.0将外部身份源(IdP)连接到华为云IAM身份中心(后续都简称为IAM身份中心),还是直接使用华为云IAM身份中心,您都需要先在IAM身份中心里创建所有的用户与组。当然,如果您选择的是对接外部身份源(IdP),并且IdP支持通过SCIM进行用户和组的自动预置,我强烈建议您使用SCIM来简化在IAM身份中心里用户和组的生命周期管理。当使用SCIM时,IAM身份中心的用户...

无论您是使用SAML2.0将外部身份源(IdP)连接到华为云IAM身份中心(后续都简称为IAM身份中心),还是直接使用华为云IAM身份中心,您都需要先在IAM身份中心里创建所有的用户与组。

当然,如果您选择的是对接外部身份源(IdP),并且IdP支持通过SCIM进行用户和组的自动预置,我强烈建议您使用SCIM来简化在IAM身份中心里用户和组的生命周期管理。当使用SCIM时,IAM身份中心的用户、组以及用户与组的关联关系,将会实施与IdP保持同步,这将意味着您只需要在IdP上维护用户和组,而不需要在IAM中心上重复这份工作。

关于如何配置外部身份源(IdP)与自动预置(SCIM),您可以参考其他链接(比如IAM身份中心如何对接微软Entra ID),或者咨询你的身份提供商厂家。


如果您的身份提供商(IdP) 尚不支持自动预置,那么您就需要在IAM身份中心上手动创建用户和组。尽管手动创建用户和组的操作并不复杂,但您可以面临非常大且乏味的重复工作。

在这篇文章中,我会想你提供一个方法,使用CSV文件,实现在IAM身份中心上用户和组的批量创建。


前期准备

  • 您运行环境需要具备Python的执行环境,我想您可以非常容易获取到它;
  • 您需要安装华为云IAM身份中心的sdk,您可以在cmd中执行如下命令进行安装: 
pip install huaweicloudsdkidentitycenterstore
  • 在IAM身份中心所在的账号,或者组织上IAM身份中心的委托管理员账号上,创建一个IAM用户,该用户需要需要具备以下权限,并获取该用户的秘钥。(该用户可在使用后删除)


{
  "Version": "5.0",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "IdentityCenter:user:create",
        "IdentityCenter:group:create",
        "IdentityCenter:group:list",
        "IdentityCenter:groupMembership:create"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "organizations:delegatedAdministrators:list"
      ]
    }
  ]
}

将用户和组批量创建到IAM身份中心

准备好待导入的csv文件

  1. 创建一个任意名称的csv文件(如user_import.csv),它的首行需要包含如下:userName,emailAddress,givenName,familyName,displayName,memberOf

需要注意的是,memberOf列将包含您要在IAM身份中心里,所创建用户所属的用户组。您可以使用“;”分隔组名,已实现用户可以被关联到多个组。如果IAM身份中心上还没有同名的组,那么该脚本会帮您在IAM身份中心创建这些组。

同时,要使此功能正常工作,该csv中每个用户都必须指定userName,emailAddress,givenName,familyName,displayName的值,如果缺少任意一个,用户都无法成功创建。userName和emailAddress不可以与其他用户重复。


准备执行的脚本

  1. 接下来,创建一个py脚本(如IdC_Import_User.py),并将如下代码复制到其中保存。您可以使用记事本或者其他文本编辑器编辑该文件。
  • 将<REGIONID>替换为IAM身份中心所在的region id;

  • 将<IDCSTORE>替换为IAM身份中心的身份存储ID(您可以在IAM身份中心的总览页找到它);
  • 将<CSVLOCATION>替换为csv文件所在的位置;
  • 将<ACCESS_KEY><SECRET_KEY>替换为之前准备的用户秘钥。


#system
import csv
import logging

#HuaweiCloud SDK
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkcore.http.http_config import HttpConfig

#Identity Store
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkidentitycenterstore.v1.region.identitycenterstore_region import IdentityCenterStoreRegion
from huaweicloudsdkidentitycenterstore.v1 import *

#########################################
# Please replace the configuration firstly
REGION = "REGIONID"
IDC_STORE_ID = "<IDCSTORE>"
FILE_URL = "<CSVLOCATION>"
ACCESS_KEY = "<ACCESS_KEY>"
SECRET_KEY = "<SECRET_KEY>"
#########################################

groupInfo = dict()
failedUserList = list()

logging.basicConfig(
    level=logging.INFO,
    filename='./run.log',
    filemode='w',
    format="%(asctime)s %(levelname)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

def get_users_from_csv():
    users = list()
    with open(FILE_URL, 'r') as fTable:
        readers = csv.DictReader(fTable)
        for reader in readers:
            users.append(reader)
    return users

def create_user(client, user):
    try:
        request = CreateUserRequest()
        request.identity_store_id = IDC_STORE_ID
        namebody = NameDto(
            family_name=user['familyName'],
            given_name=user['givenName']
        )
        listEmailsbody = [
            EmailDto(
                primary=True,
                type="Work",
                value=user['emailAddress']
            )
        ]
        request.body = CreateUserReqBody(
            password_mode="EMAIL",
            user_name=user['userName'],
            name=namebody,
            emails=listEmailsbody,
            display_name=user['displayName']
        )
        response = client.create_user(request)
        userID = response.user_id
        return userID

    except Exception as e:
        logging.error(f"Failed to create user: {user['userName']}. Reason: {e}")

def list_groups(client):
    try:
        request = ListGroupsRequest()
        request.identity_store_id = IDC_STORE_ID
        response = client.list_groups(request)
        global groupInfo
        for item in response.groups:
            groupInfo[item.display_name] = item.group_id
        return
    except exceptions.ClientRequestException as e:
        logging.error(f"Error Code: {e.status_code}, Error Message: {e.error_msg}")

def create_group(client, group_name):
    try:
        request = CreateGroupRequest()
        request.identity_store_id = IDC_STORE_ID
        request.body = CreateGroupReqBody(
            display_name=group_name
        )
        response = client.create_group(request)
        groupID = response.group_id
        return groupID
    except exceptions.ClientRequestException as e:
        logging.error(f"Failed to create group: {group_name}")
        logging.error(f"Error Code: {e.status_code}, Error Message: {e.error_msg}")

def create_group_membership(client, group_id, group_name, user_id, user_name):
    try:
        request = CreateGroupMembershipRequest()
        request.identity_store_id = IDC_STORE_ID
        memberIdbody = MemberIdDto(
            user_id=user_id
        )
        request.body = CreateGroupMembershipReqBody(
            member_id=memberIdbody,
            group_id=group_id
        )
        response = client.create_group_membership(request)
    except exceptions.ClientRequestException as e:
        logging.error(f"Failed to create membership, group mame: {group_name}, user name: {user_name}")
        logging.error(f"Error Code: {e.status_code}, Error Message: {e.error_msg}")

def process_user(client, user):
    userId = create_user(client, user)
    if not userId:
        logging.debug(f"Failed to create user : {user['userName']}")
        failedUserList.append(user['userName'])
        return
    if not user["memberOf"]:
        return

    groups = user["memberOf"].split(';')
    for groupName in groups:
        global  groupInfo
        if groupName not in groupInfo:
            id = create_group(client,groupName)
            groupInfo[groupName] = id
            logging.info(f"Create group success : {groupName}")
        groupId = groupInfo[groupName]
        create_group_membership(client, groupId, groupName, userId, user['userName'])
    return

def get_clients(region: str):
    basicCredentials = BasicCredentials(ACCESS_KEY, SECRET_KEY)
    identityStoreClient = IdentityCenterStoreClient.new_builder() \
        .with_credentials(basicCredentials) \
        .with_region(IdentityCenterStoreRegion.value_of(region)) \
        .build()
    return identityStoreClient

def IdC_Import_User():
    logging.info(f"Begin to Import user from {FILE_URL} to {IDC_STORE_ID}, region is {REGION}")
    client = get_clients(REGION)
    list_groups(client)
    logging.info(f"Total group fount in IAM Identity Center: {len(groupInfo)}")
    users = get_users_from_csv()
    logging.info(f"Total user in csv file: {len(users)}")
    for user in users:
        process_user(client, user)
    logging.info(f"Total user number: {len(users)}, failed user number: {len(failedUserList)}")
    logging.info(f"Failed user list: {failedUserList}")
if __name__ == "__main__":
    IdC_Import_User()

2. 在执行脚本后,你会得到一个名为run.log的日志文件,在文件中,你会获取创建失败的用户列表;你可以检查下这些用户创建失败的原因,并重新执行脚本或者在控制台上手工创建它们。

结尾

通过该脚本,可以在当您的身份提供商(IdP)不支持SCIM自动预置时,提高您首次在IAM身份中心上批量创建用户的效率。

一般该脚本会在您第一次使用IAM身份中心时并且由大量用户和组创建的工作场景下使用它。一旦您将用户批量创建好,后续的用户和组的生命周期管理,您可以在控制台上做维护。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。