数据仓库行为域ODS开发

举报
tea_year 发表于 2025/09/19 18:26:04 2025/09/19
【摘要】 行为域ODS开发数据仓库的经典定义:数据仓库(Data Warehouse)是一个面向主题的(Subject Oriented)、集成的(Integrated)、相对稳定的(Non-Volatile)、反映历史变化(Time Variant)的数据集合,用于支持管理决策(Decision Making Support)。一、行为域ODS层详细设计1 ODS层功能ODS:操作数据层主要作用:直...

行为域ODS开发

数据仓库的经典定义:

数据仓库(Data Warehouse)是一个面向主题的(Subject Oriented)、集成的(Integrated)、相对稳定的(Non-Volatile)、反映历史变化(Time Variant)的数据集合,用于支持管理决策(Decision Making Support)。


一、行为域ODS层详细设计

1 ODS层功能

ODS:操作数据层

主要作用:直接映射操作数据(原始数据),数据备份;

建模方法:与原始数据结构保持完全一致

存储周期:相对来说,存储周期较短;视数据规模,增长速度,以及业务的需求而定;对于埋点日志数据ODS层存储,通常可以选择3个月或者半年;


  • 模拟生成日志数据

1.将genlog.sh、log-generator-jar-with-dependencies.jar上传到/root目录下
2.在/root目录下创建一个moni_data文件夹
3.sh genlog.sh user   先生成user用户日志数据
4.sh genlog.sh log   在生成用户的行为日志数据 【最终需要的日志数据/root/moni_data/app.access.log.2022-11-08】

了解日志数据包含的字段
{
"account": "第五园",                           -- 用户名
"appid": "app1",                             -- app的唯一标识
"appversion": "8.1",                         -- app的版本号
"carrier": "360移动",                         -- 运营商
"deviceid": "OHFMEDDHWQGQ",                   -- 设备编号
"devicetype": "GALAXY-6",                     -- 设备类型(手机型号)
"eventid": "thumbUp",                         -- 事件类型ID
"ip": "34.142.34.41",                         -- 外网ip地址
"latitude": 27.48026698890073,               -- 纬度
"longitude": 107.10385540147524,             -- 经度
"nettype": "WIFI",                           -- 网络类型 4g 5g wifi
"osname": "android",                         -- 操作系统
"osversion": "8.5",                           -- 操作系统版本号
"properties": {                               -- 属性 不同的eventid事件,拥有不同的属性
"itemId": "00768",
"refUrl": "/teachers/tea0838.html",
"pageId": "tea0927",
"url": "/teachers/tea0927.html"
},
"releasechannel": "木蚂蚁安卓应用市场",         -- 应用市场
"resolution": "2048*768",                     -- 分辨率
"sessionid": "kvmqpnwn",                     -- 会话ID
"timestamp": 1667867494026                   -- 时间戳
}


二、行为域ODS层开发需求

1.存储规划
数据类型 输入路径(HDFS目录) 目标位置(HIVE表)
app端埋点日志 /logdata/app/2022-11-08/ ods.app_event_log 分区:2022-11-08

1.在HDFS上创建一个目录/logdata/app/2022-11-08 hdfs dfs -mkdir -p /logdata/app/2023-06-22

2.使用put命令,将日志上传到HDFS的/logdata/app/2022-11-08目录下 hdfs dfs -put /root/moni_data/app.access.log.2023-06-22 /logdata/app/2023-06-22

2.入库要求
  • 原始日志格式

普通文本文件,JSON数据格式,导入hive表后,要求可以很方便地select各个字段

  • 分区表

  • 外部表


3 hive建表加强

Json数据的hive解析

由于原始数据是普通文本文件,而文件内容是json格式的一条一条记录

在创建hive表结构进行映射时,有两种选择:

  1. 将数据视为无结构的string

  2. 将数据按json格式进行映射(这需要JsonSerde 的支持)


本项目采用方案2来进行建表映射

使用HIVE内置JsonSerDe

image-20220815233325214.png

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'

官方文档: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-JSON


4.日志数据建表

app事件日志建表

create database ods;
drop table if exists ods.app_event_log;
create table ods.app_event_log(
account           string,
appid             string,
appversion       string,
carrier           string,
deviceid         string,
devicetype       string,
eventid           string,
ip               string,
latitude          double,
longitude         double,
nettype           string,
osname           string,
osversion         string,
properties       Map<String,String>,
releasechannel   string,
resolution       string,  
sessionid         string,
`timestamp`       bigint
)
partitioned by(dt string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE;

注意:使用 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe' 这种方式 ​ 需要json中的属性名和表中的字段名保持一致


5.入库命令
load data inpath '/logdata/app/2022-11-08' into table ods.app_event_log partition(dt='2022-11-08')


6.脚本开发

01.日志数据导入ODS层.sh

#! /bin/bash

dt1=$1

if [ 'x'$1 == 'x' ];then
 dt1=$(date -d'-1 day' +%Y-%m-%d)
fi

echo $dt1

#hive -e "sql"
hive -e "load data inpath '/logdata/app/$dt1' into table ods.app_event_log partition(dt='$dt1')"



三、行为域DWD开发

1. 概要设计

1.存储规划
数据类型 源表 目标表
app端埋点日志 ods.event_app_log dwd.event_log_detail
2 技术选型

由于本层数据ETL的需求较为复杂,用hive sql实现非常困难

因而此环节将开发spark程序来实现


2. 需求分析

1.清洗过滤

1,去除json数据体中的废弃字段(前端开发人员在埋点设计方案变更后遗留的无用字段)

2,过滤掉json格式不正确的(脏数据)

3,过滤掉日志中缺少关键字段(deviceid/properties/eventid/sessionid 缺任何一个都不行)的记录

4,过滤掉日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达)


a)、创建数据库和一张表

create database tmp;   -- 临时层 ods层数据经过处理先写入tmp,因为还没有完全处理成dwd层希望的结果
-- 临时层:数据清洗之后存储event_log_washed
create table tmp.event_log_washed(
account           string,
appid             string,
appversion       string,
carrier           string,
deviceid         string,
devicetype       string,
eventid           string,
ip               string,
latitude          double,
longitude         double,
nettype           string,
osname           string,
osversion         string,
properties       Map<String,String>,
releasechannel   string,
resolution       string,  
sessionid         string,
`timestamp`       bigint
)
partitioned by(dt string)


b)、创建一个类AppLogWash,读取ods.app_event_log表的数据写入tmp.event_log_washed

① 添加依赖
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.3</version>
</dependency>

② 在resources目录下创建一个application.properties
#local.run为true代表在本地测试,否则在集群测试
local.run=false

③ 读取配置文件ConfigUtils工具类
package cn.yh.utils

import com.typesafe.config.{Config, ConfigFactory}

/**
* 1.在pom.xml添加一个依赖
* 2.在resources目录下创建一个application.properties配置文件
* 3.编写这个工具类
*/
object ConfigUtils {

 //加载resources目录下的application.properties配置文件
 private val config: Config = ConfigFactory.load()
 //根据配置文件中的key获取value
 val LOCAL_RUN:Boolean = config.getBoolean("local.run")

}

④ 获取SparkSession对象的工具类
package cn.yh.utils

import org.apache.spark.sql.SparkSession

object SparkUtils {

 def getSparkSession(appName:String) = {
   if(ConfigUtils.LOCAL_RUN){
     val spark: SparkSession = SparkSession.builder()
                                          .master("local[*]")
                                          .appName(appName)
                                          .enableHiveSupport()
                                          .getOrCreate()
     spark
  }else{
     val spark: SparkSession = SparkSession.builder()
                                          .master("spark://hadoop10:7077")
                                          .appName(appName)
                                          .enableHiveSupport()
                                          .getOrCreate()
     spark
  }
}

}


⑤ 数据清洗类
package cn.yh.ods_etl

import cn.yh.utils.SparkUtils
import org.apache.spark.sql.{DataFrame, SparkSession}

object AppLogWash {

 def main(args: Array[String]): Unit = {
   if(args.length == 0){
     println("请提供时间参数")
     System.exit(0)
  }
   val time = args(0)
   println(s"参数1 ${time}")

   val spark: SparkSession = SparkUtils.getSparkSession("数据清洗-AppLogWash")

   val sql =
     s"""
       |
       |insert overwrite table tmp.event_log_washed
       |partition(dt='${time}')
       |select
       | account       ,
       | appid         ,
       | appversion     ,
       | carrier       ,
       | deviceid       ,
       | devicetype     ,
       | eventid       ,
       | ip             ,
       | latitude       ,
       | longitude     ,
       | nettype       ,
       | osname         ,
       | osversion     ,
       | properties     ,
       | releasechannel ,
       | resolution     ,
       | sessionid     ,
       | `timestamp`
       |from ods.app_event_log
       |where dt = '${time}'
       |     and deviceid is not null and trim(deviceid) != ''
       |     and eventid is not null and trim(eventid) != ''
       |     and properties is not null and size(properties) != 0
       | and sessionid is not null and trim(sessionid) != ''
       | and unix_timestamp(to_utc_timestamp('${time}','GMT+8'),'yyyy-MM-dd')*1000 <= `timestamp`
       | and unix_timestamp(to_utc_timestamp(date_add('${time}',1),'GMT+8'),'yyyy-MM-dd')*1000 > `timestamp`
       |
       |""".stripMargin

   spark.sql(sql)

   spark.stop()
}
}

⑥ 编写【02.数据清洗.sh】脚本,测试AppLogWash类
[root@hadoop10 app]# cat /root/shell/02.数据清洗.sh
#! /bin/bash

dt=$1

if [ 'x'$dt == 'x' ];then
dt=$(date -d'-1 day' +'%Y-%m-%d')
fi

echo $dt
spark-submit --master spark://hadoop10:7077 --class cn.yh.ods_etl.AppLogWash /opt/app/spark-dw.jar $dt
[root@hadoop10 app]# sh /root/shell/02.数据清洗.sh 2022-11-08


2.SESSION分割

1.什么是session(会话) 浏览器访问一个系统,需要登陆,登陆之后点击收藏、订单、浏览记录则不需要再登陆,浏览器默认的会话时间是30分钟 手机APP(客户端)往往不需要像浏览器这样频繁的登陆,因为sessionid(token、票据)存活时间比较长,所以手机APP一次会话时间比较长 手机APP的一次会话往往时间较长,会超出30分钟

2.session切割? 因为APP的session往往持续时间比较长,不像浏览器那样,有固定的30分钟,所以对APP上的行为进行会话内的分析是不准确的 当一个行为发生时间相比较上一个行为的时间超过10分钟,则认为用户刚才将APP后台或者退出了,则从该行为开始属于一个新的会话

设备编号     事件名称     时间                    sessionid   
deviceid1   event1     2022-10-14 10:10:10     abc
deviceid1   event2     2022-10-14 10:10:40     abc
deviceid1   event3     2022-10-14 10:13:40     abc
deviceid1   event4     2022-10-14 10:16:40     abc
deviceid1   event5     2022-10-14 10:19:40     abc
deviceid1   event6     2022-10-14 10:25:40     abc
deviceid1   event7     2022-10-14 11:46:40     abc
deviceid1   event8     2022-10-14 11:49:40     abc

进行session切割之后的数据
设备编号   事件名称       时间                 sessionid     newsessionid
deviceid1   event1     2022-10-14 10:10:10     abc           abc-0    
deviceid1   event2     2022-10-14 10:10:40     abc           abc-0
deviceid1   event3     2022-10-14 10:13:40     abc           abc-0
deviceid1   event4     2022-10-14 10:16:40     abc           abc-0
deviceid1   event5     2022-10-14 10:19:40     abc           abc-0
deviceid1   event6     2022-10-14 10:25:40     abc           abc-0
deviceid1   event7     2022-10-14 11:46:40     abc           abc-1
deviceid1   event8     2022-10-14 11:49:40     abc           abc-1

总结:通过对用户的行为按照时间进行session会话的切割,可以对设备的行为按照会话级别粒度进行分析,例如一个用户在一次会话中从哪进入了产品,又从哪退出了产品,在一次会话中用户的停留时长,在一次会话中用户访问了多少个页面等


a) 创建一张表,存储session切割之后的结果

create table tmp.event_log_splited(
account           string,
appid             string,
appversion       string,
carrier           string,
deviceid         string,
devicetype       string,
eventid           string,
ip               string,
latitude          double,
longitude         double,
nettype           string,
osname           string,
osversion         string,
properties       Map<String,String>,
releasechannel   string,
resolution       string,  
sessionid         string,
`timestamp`       bigint,
newsessionid     string
)
partitioned by(dt string)


b) 创建一个类AppLogSessionSplit,通过sparksql完成session会话切割操作

package cn.yh.ods_etl

import cn.yh.utils.SparkUtils
import org.apache.spark.sql.SparkSession

object AppLogSessionSplit {

 def main(args: Array[String]): Unit = {
   if(args.length == 0){
     println("请输入时间,格式为(yyyy-mm-dd)")
     System.exit(0)
  }
   val time = args(0)

   val spark: SparkSession = SparkUtils.getSparkSession("session切割")

   spark.sql(
     s"""
       |
       |insert overwrite table tmp.event_log_splited
       |partition (dt='${time}')
       |select   account             ,
|         appid               ,
|         appversion         ,
|         carrier             ,
|         deviceid           ,
|         devicetype         ,
|         eventid             ,
|         ip                 ,
|         latitude           ,
|         longitude           ,
|         nettype             ,
|         osname             ,
|         osversion           ,
|         properties         ,
|         releasechannel     ,
|         resolution         ,
|         sessionid           ,
|         `timestamp`         ,
       |       concat(sessionid,'-',sum(ts) over(partition by sessionid order by `timestamp`)) newsessionid
       |from(
       |select
       |   *,
       |   if( (`timestamp`-lag(`timestamp`,1,`timestamp`) over(partition by sessionid order by `timestamp`))/1000/60 > 10,1,0 ) ts
       | from tmp.event_log_washed t1
       | where dt = '${time}'
       |)t2
       |
       |""".stripMargin)

   spark.stop()
}
}


3. 数据集成(本质上是做维度退化)

1,将日志中的GPS经纬度坐标解析成省、市、县(区)信息;(为了方便后续的地域维度分析)

2,将日志中的IP地址解析成省、市、县(区)信息;(为了方便后续的地域维度分析)

注:app日志和wxapp日志,有采集到的用户事件行为时的所在地gps坐标信息

web日志则无法收集到用户的gps坐标,但可以收集到ip地址

gps坐标可以表达精确的地理位置,而ip地址只能表达准确度较低而且精度较低的地理位置


地理位置解析
  • 高德逆地理位置

参考:高德开发者平台
https://lbs.amap.com/api/webservice/summary/

使用scala代码完成上述接口的查询工作
① 添加依赖
<dependency>
   <groupId>org.scalaj</groupId>
   <artifactId>scalaj-http_2.11</artifactId>
   <version>2.4.1</version>
</dependency>

<dependency>
   <groupId>cn.hutool</groupId>
   <artifactId>hutool-all</artifactId>
   <version>4.6.8</version>
</dependency>

② 测试
package test

import scalaj.http.{Http, HttpRequest, HttpResponse}

object Test_高德逆地理位置测试 {

 def main(args: Array[String]): Unit = {
   //1.创建request请求对象,发送url请求
   var request: HttpRequest = Http("https://restapi.amap.com/v3/geocode/regeo")
   request = request.param("key","自己申请的key")
                    .param("location","104.679284,31.534707")

   //2.获取响应对象,并且将响应对象中body里面的内容转换成字符串
   val response: HttpResponse[String] = request.asString
   //3.获取body里面的内容
   val body: String = response.body

   //4.使用hutool解析body里面的省市区
   //{"status":"1","regeocode":{"addressComponent":{"city":"绵阳市","province":"四川省","adcode":"510703","district":"涪城区","towncode":"510703103000","streetNumber":{"number":"13正","location":"104.681946,31.533269","direction":"东南","distance":"298.728","street":"兴龙街"},"country":"中国","township":"青义镇","businessAreas":[[]],"building":{"name":[],"type":[]},"neighborhood":{"name":[],"type":[]},"citycode":"0816"},"formatted_address":"四川省绵阳市涪城区青义镇G5京昆高速"},"info":"OK","infocode":"10000"}
}
}
  • 根据高德逆地理位置,解析经纬度解析省市区写入DWD层

create database dwd;
create table dwd.event_log_detail(
account           string,
appid             string,
appversion       string,
carrier           string,
deviceid         string,
devicetype       string,
eventid           string,
ip               string,
latitude          double,
longitude         double,
nettype           string,
osname           string,
osversion         string,
properties       Map<String,String>,
releasechannel   string,
resolution       string,  
sessionid         string,
`timestamp`       bigint,
newsessionid     string,
province         string,  
city             string,  
district         string
)
partitioned by(dt string)


package cn.yh.ods_etl

import cn.hutool.json.{JSONObject, JSONUtil}
import cn.yh.utils.SparkUtils
import scalaj.http.{Http, HttpRequest, HttpResponse}

object AppLogToDWD {
def main(args: Array[String]): Unit = {
  val spark = SparkUtils.getSparkSession("根据经纬度查询地理位置")

  //自定义函数,根据经纬度查询省市区
  //三个参数:经度/纬度/标识符号(province、city、district)
  spark.udf.register("get_cityinfo",(longitude:String,latitude:String,s1:String)=>{
    var request: HttpRequest = Http("https://restapi.amap.com/v3/geocode/regeo")
    request = request.param("key","e2cc8f0f89145aa2f9b3da7ee99cd91b")
                      .param("location",longitude+","+latitude)

    val httpResponse: HttpResponse[String] = request.asString
    val jSONObject: JSONObject = JSONUtil.parseObj(httpResponse.body)
    val status: String = jSONObject.getStr("status") //返回结果状态值 返回值为0或1,0表示请求失败;1表示请求成功。
    var name = ""
    if("1".equals(status)){
      val jSONObject1 = jSONObject.getJSONObject("regeocode").getJSONObject("addressComponent")
      if("province".equals(s1)){
        name = jSONObject1.getStr("province")
      }else if("city".equals(s1)){
        name = jSONObject1.getStr("city")
      }else if("district".equals(s1)){
        name = jSONObject1.getStr("district")
      }
    }else{
      name = "未知"
    }
    name
  })

  spark.sql(
     """
       |
       |insert overwrite table dwd.event_log_detail
       |partition(dt = '2022-11-08')
       |select   account       ,
       |         appid         ,
       |         appversion   ,
       |         carrier       ,
       |         deviceid     ,
       |         devicetype   ,
       |         eventid       ,
       |         ip           ,
       |         latitude     ,
       |         longitude     ,
       |         nettype       ,
       |         osname       ,
       |         osversion     ,
       |         properties   ,
       |         releasechannel,
       |         resolution   ,
       |         sessionid     ,
       |         `timestamp`   ,
       |         newsessionid ,
       |         get_cityinfo(round(longitude,6),round(latitude,6),'province')   province,
       |         get_cityinfo(round(longitude,6),round(latitude,6),'city')       city,
       |         get_cityinfo(round(longitude,6),round(latitude,6),'district')   district
       |from tmp.event_log_splited where dt = '2022-11-08' limit 1
       |
       |""".stripMargin)

  spark.stop()
}

}

注意:上述代码会造成高德API接口调用次数过多


  • 计算经纬度的GeoHash

GEOHASH编码介绍

Geohash编码是一种地理位置编码技术,它可将一个gps坐标(含经、纬度)点,转化为一个字符串;

wx3y5697

wx3y5694

通过编码后得到的字符串,表达的是:包含被编码gps坐标点的一个矩形范围;

wps1.jpg


GEOHASH编码原理

在地球经纬度范围内,不断通过二分来划分矩形范围,通过观察gps坐标点所落的范围,来反复生成0/1二进制码。

在满足精度要求后,将所得的二进制编码通过base32编码技术转成字符串码,如下所示:

wps3.jpg


GEOHASH码的精度

字符串长度越长,表达的精度越高,矩形范围越小,越逼近原gps坐标点;

相反,长度越短,表达的精度越低,矩形范围越大;

geohash码的精确度对应表格:

wps4.jpg

GEOHASH编码工具包

gps坐标 转码成 geohash编码,这个算法不需要自己手写,有现成的工具包

1.添加依赖

<dependency>
   <groupId>ch.hsr</groupId>
   <artifactId>geohash</artifactId>
   <version>1.3.0</version>
</dependency>

2.根据经纬度,生成一个GeoHash值

/**
* 参数1:纬度
* 参数2:经度
* 参数3:生成geohash字符串的长度
*/
val str1: String = GeoHash.geoHashStringWithCharacterPrecision(34.795013, 113.54602, 5)
println(str1)


将mysql_testdata.sql导入mysql数据库下,读取mysql数据库t_md_areas表中得数据,获取省市区街道、经纬度通过GeoHash处理经纬度,得到GeoHash编码,将带有GeoHash编码的结果写入Hive数仓

1.使用spark读取mysql中t_md_areas表中数据,存到DataFrame中
2.spark自定义函数,参数是经纬度,返回值是GeoHash编码
3.创建DataFrame对应临时表,进行自连接查询,获取省市区街道、经纬度,并且使用自定义函数处理经纬度获取GeoHash编码
4.将上述结果写入hive表中dim.area_geo (id编号、省、市、区、街道、经度、纬度、GeoHash编码)

创建dim层  create database dim;
计算GeoHash
package cn.yh.ods_etl

import java.util.Properties

import ch.hsr.geohash.GeoHash
import cn.yh.utils.{ConfigUtils, SparkUtils}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* 读取mysql数据库中t_md_areas表的数据,并且计算街道经纬的geohash编码
*/
object ComputerGeoHash {

 def main(args: Array[String]): Unit = {
   val spark: SparkSession = SparkUtils.getSparkSession("计算经纬度的GeoHash编码")
   //1.spark读取mysql表数据
   val prop = new Properties()
   prop.setProperty("driver",ConfigUtils.DRIVER)
   prop.setProperty("user",ConfigUtils.USERNAME)
   prop.setProperty("password",ConfigUtils.PASSWORD)
   val df: DataFrame = spark.read.jdbc(ConfigUtils.URL, "t_md_areas", prop)

   //2.自定义单行函数,根据经纬度统计geo_hash
   spark.udf.register("get_geohash",(latitude:String,longitude:String,n:Int)=>{
     val str: String = GeoHash.geoHashStringWithCharacterPrecision(latitude.toDouble, longitude.toDouble, n)
     str
  })

   //3.自连接,查询省市区、经纬度、5位长度geohash、6位长度的geohash
   df.createTempView("t_md_areas")
   val sql =  """
                |select t1.id,
                |       t1.AREANAME prov,t2.AREANAME city,t3.AREANAME region,
                | t4.AREANAME street,t4.BD09_LAT latitude,t4.BD09_LNG longitude,
                |       get_geohash(t4.BD09_LAT,t4.BD09_LNG,5) geohash5,
                |       get_geohash(t4.BD09_LAT,t4.BD09_LNG,6) geohash6
                |from t_md_areas t1
                |inner join t_md_areas t2 on t1.id = t2.parentid and t1.parentid = 0
                |inner join t_md_areas t3 on t2.id = t3.parentid
                |inner join t_md_areas t4 on t3.id = t4.parentid
                |where t4.BD09_LAT is not null and t4.BD09_LNG is not null
                |
                |""".stripMargin
   val df2: DataFrame = spark.sql(sql)

   //4.将df2的数据写入hive表中
   //dim数据库需要手动提前创建,saveAsTable底层会自动创建area_geo表
   df2.write.saveAsTable("dim.area_geo")

   spark.stop()
}
}
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。