人工智能个人理财助手的用户行为分析:从埋点到洞察的完整实战

举报
江南清风起 发表于 2025/07/30 18:38:27 2025/07/30
【摘要】 人工智能个人理财助手的用户行为分析:从埋点到洞察的完整实战 1. 问题背景与业务目标 1.1 业务场景AI 个人理财助手(下文简称 IPA)通过对话式界面帮助用户完成「记账-预算-投资」闭环。我们希望回答:用户为什么中途流失?哪类功能最能提升留存?如何在合规前提下用 AI 向不同客群推送差异化理财建议? 1.2 北极星指标北极星指标:30 日留存率(Day30 Retention)。拆解公...

人工智能个人理财助手的用户行为分析:从埋点到洞察的完整实战

1. 问题背景与业务目标

1.1 业务场景

AI 个人理财助手(下文简称 IPA)通过对话式界面帮助用户完成「记账-预算-投资」闭环。我们希望回答:

  • 用户为什么中途流失?
  • 哪类功能最能提升留存?
  • 如何在合规前提下用 AI 向不同客群推送差异化理财建议?

1.2 北极星指标

北极星指标:30 日留存率(Day30 Retention)。拆解公式:
Day30 = f(首次激活完成度, 功能使用深度, 理财教育完成度, 推送触达效果)


2. 用户行为数据体系设计

2.1 事件-属性双层模型

事件类型 关键属性 示例
app_launch channel, os 小米商店、Android
onboarding_step step_id, duration step_3, 15.2s
bill_add bill_type, amount, auto_tag 餐饮, 28.5, False
budget_set category, amount 餐饮, 1200
invest_view product_id, risk_level 0001, R3
push_receive campaign_id, msg_type 2025q3_edu, 图文

2.2 数据仓库分层

ODS → DWD(事件明细)→ DWS(用户-日汇总)→ ADS(分析主题宽表)


3. 埋点与数据管道实现(Python 代码)

3.1 客户端埋点(简化版)

以 React-Native 为例,只展示 bill_add

// track.js
import { Analytics } from 'mobile-analytics-sdk';

export const trackBillAdd = (bill) => {
  Analytics.track('bill_add', {
    bill_type: bill.type,
    amount: bill.amount,
    auto_tag: bill.autoTag,
    ts: Date.now(),
    user_id: global.userId,
  });
};

3.2 服务端日志收集

使用 FastAPI + Kafka:

# logger_service.py
from fastapi import FastAPI, Request
from aiokafka import AIOKafkaProducer
import json, time, uvicorn

app = FastAPI()
producer = AIOKafkaProducer(bootstrap_servers='kafka:9092')

@app.post("/collect")
async def collect(request: Request):
    payload = await request.json()
    payload['server_ts'] = int(time.time()*1000)
    await producer.send("user_events", json.dumps(payload).encode())
    return {"status": "ok"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

3.3 流式 ETL(Spark Structured Streaming)

# etl_stream.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("IPA_ETL").getOrCreate()

df = (spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("subscribe", "user_events")
        .load()
        .selectExpr("CAST(value AS STRING) as json"))

schema = spark.read.json(df.rdd.map(lambda r: r.json)).schema

parsed = (df
  .select(from_json(col("json"), schema).alias("e"))
  .select("e.*")
  .withColumn("dt", to_date(from_unixtime(col("ts")/1000)))
  .withWatermark("ts", "10 minutes"))

query = (parsed.writeStream
           .format("parquet")
           .option("path", "/warehouse/ods/events")
           .option("checkpointLocation", "/chk/events")
           .partitionBy("dt")
           .trigger(processingTime='1 minute')
           .start())
query.awaitTermination()

4. 行为特征工程与标签化

4.1 特征分层

  1. 原始行为:事件序列
  2. 统计特征:7 日日均账单数、近 30 日预算超支率
  3. 语义特征:用户理财问题文本 embedding(Sentence-BERT)
  4. 图特征:用户-产品二部图 PageRank

4.2 标签化示例(PySpark)

# user_tag.py
from pyspark.sql import functions as F
from pyspark.ml.feature import Bucketizer

df = spark.table("dws.user_daily")

# 计算「理财成熟度」
df = df.withColumn("maturity_score",
    F.when(F.col("invest_view_cnt_30d") > 5, 3)
     .when(F.col("budget_set_cnt_7d") > 0, 2)
     .otherwise(1))

# 金额分桶
bucketizer = Bucketizer(splits=[0, 500, 2000, 10000, float("inf")],
                        inputCol="avg_bill_amount_30d",
                        outputCol="amount_level")
df = bucketizer.transform(df)

df.select("user_id", "maturity_score", "amount_level") \
  .write.mode("overwrite").saveAsTable("ads.user_tag")

5. 深度行为分析:留存、路径与LTV

5.1 30 日留存拆解

-- retention.sql
WITH first_day AS (
  SELECT user_id, min(dt) AS first_dt
  FROM ods.events
  WHERE event='app_launch'
  GROUP BY 1
),
retention AS (
  SELECT f.user_id,
         datediff(e.dt, f.first_dt) AS delta
  FROM first_day f
  JOIN ods.events e
    ON f.user_id = e.user_id
)
SELECT delta, count(distinct user_id) as users
FROM retention
WHERE delta BETWEEN 0 AND 30
GROUP BY delta
ORDER BY delta;

用 Python 画留存曲线:

import pandas as pd, seaborn as sns, matplotlib.pyplot as plt
df = pd.read_sql("SELECT * FROM retention ORDER BY delta", conn)
sns.lineplot(x='delta', y='users', data=df)
plt.title("Cohort Retention")
plt.show()

5.2 行为路径 Sankey

使用 retentioneering 库:

import retentioneering as re
re.config.update_config(col_event='event',
                        col_user_id='user_id',
                        col_time='ts')
df = pd.read_parquet("/warehouse/ods/events")
dataset = re.Dataset(df)
dataset.rete.plot_graph(
    weight_col='user_id',
    thresh=0.03,
    targets=['invest_view'],
    width=800, height=600
)

5.3 LTV 预估(Gamma-Gamma + BG/NBD)

from lifetimes import GammaGammaFitter, BetaGeoFitter
df_rfm = spark.sql("""
  SELECT user_id,
         datediff(max(dt), min(dt)) as T,
         count(distinct dt) as frequency,
         sum(amount) as monetary_value
  FROM ods.events
  WHERE event='bill_add'
  GROUP BY 1
""").toPandas()

bgf = BetaGeoFitter(penalizer_coef=0.01)
bgf.fit(df_rfm['frequency'], df_rfm['T'])

ggf = GammaGammaFitter(penalizer_coef=0.01)
ggf.fit(df_rfm['frequency'], df_rfm['monetary_value'])

df_rfm['predicted_clv'] = ggf.customer_lifetime_value(
    bgf,
    df_rfm['frequency'],
    df_rfm['T'],
    df_rfm['monetary_value'],
    time=12,  # 12 个月
    freq_unit='D'
)

6. 个性化推荐与干预策略

6.1 强化学习推送

状态:用户成熟度、预算超支率、最近 7 日活跃度
动作:{教育卡片、基金推荐、记账提醒、沉默唤醒}
奖励:用户 3 日内活跃且投资转化率
使用 Stable-Baselines3 的 PPO:

import gym, numpy as np
from stable_baselines3 import PPO

class FinancePushEnv(gym.Env):
    def __init__(self, user_features):
        super().__init__()
        self.user = user_features
        self.action_space = gym.spaces.Discrete(4)
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(10,), dtype=np.float32)
    def step(self, action):
        reward = simulate_reward(self.user, action)
        next_state = self.user.next_features()
        done = True
        return next_state, reward, done, {}
    def reset(self):
        return self.user.to_vec()

env = FinancePushEnv(user_features)
model = PPO("MlpPolicy", env, verbose=1).learn(total_timesteps=10000)

6.2 A/B 实验框架

# ab_test.py
import statsmodels.api as sm

def evaluate_exp(df):
    treat = df[df.variant=='push_v2']['converted']
    ctrl  = df[df.variant=='push_v1']['converted']
    return sm.stats.proportion_ztest([treat.sum(), ctrl.sum()],
                                     [len(treat), len(ctrl)])

p_value = evaluate_exp(spark.table("exp.push_exp").toPandas())
print("p-value:", p_value)

7. 隐私、合规与可解释性

  • 联邦学习:模型训练时不离开端侧,使用 TensorFlow Federated。
  • 可解释性:对强化学习策略使用 SHAP 分解状态特征贡献;对 LTV 使用 Gamma-Gamma 参数解释。
  • GDPR/CCPA:所有埋点加密 user_id,提供「一键忘记」接口:
# forget_user.py
from Crypto.Hash import SHA256
def anonymize(uid):
    return SHA256.new(uid.encode()).hexdigest()[:16]

8. 结论与下一步规划

  1. 当前 Day30 留存 18% → 实验组提升 4.7%,主要归因于「预算超支提醒」强化学习策略。
  2. 下一步:
    • 引入多模态数据(语音对话情绪)提升意图识别。
    • 继续扩展联邦学习,实现跨行数据合作训练。
    • 将 LTV 模型与实时竞价广告系统对接,实现动态获客出价。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。