APP企业级业务数据埋点系统(基于 Spring Boot & ClickHouse)
本文提出了一套基于Spring Boot的统一埋点系统设计方案。系统采用分层架构,包含埋点SDK、采集服务、消息队列、消费服务、ClickHouse存储和分析服务等模块。SDK提供注解和API两种接入方式,支持异步缓冲和降级策略。系统实现了从埋点采集到分析告警的完整闭环,通过消息队列解耦业务与存储,采用ClickHouse作为核心OLAP存储。规范方面定义了模块、页面、场景等标准字段,并设计了配套
文章目录
1. 背景与目标
- 背景:当前缺乏系统化的业务埋点体系,无法对用户行为和业务指标进行精细化分析。项目已提供埋点规范文件,定义了主要模块、页面、场景及事件。
- 目标:在不显著影响业务性能的前提下,构建一套基于 Spring Boot 的统一埋点系统,实现从 埋点采集 → 传输 → 存储 → 分析 → 监控告警 的完整闭环,并以 ClickHouse 作为核心 OLAP 存储。
2. 系统整体架构设计
2.1 功能模块
-
埋点 SDK(Server-Side SDK):
- 提供注解 + API 两种接入方式。
- 屏蔽埋点协议、批量发送、重试等复杂细节。
- 支持异步、缓冲队列和本地降级策略。
-
埋点采集服务(Ingestion API):
- 提供 HTTP 接口接收埋点事件(单条 / 批量)。
- 做基础校验、补充公共字段、落消息队列或本地缓冲。
-
消息队列层(RabbitMQ/Kafka):
- 解耦业务系统与存储,防止 ClickHouse 抖动反向影响业务。
- 支持批量消费、重试队列和死信队列。
-
埋点消费者服务(Consumer):
- 从 MQ 批量拉取事件,写入 ClickHouse 明细表。
- 支持批量 INSERT 与失败重试、告警。
-
ClickHouse 集群:
- 存储明细埋点数据 + 预聚合统计数据。
- 提供高并发 OLAP 查询能力。
-
分析与查询服务:
- 业务指标 API:UV、转化率、漏斗、留存等。
- 提供运营 / 产品侧可视化对接(BI、大屏)。
-
监控与告警:
- 埋点链路健康监控:QPS、失败率、MQ 堆积、ClickHouse 写入错误等。
- 告警分级推送(邮件 / IM / 短信)。
2.2 系统架构图(Mermaid)
3. 埋点规范与 API 设计
3.1 埋点事件规范
基于比如某个app业务系统中中定义的字段,统一抽象为如下逻辑字段:
- 模块 module:如“首页”“视频”“AI朋友”“礼物系统”“个人中心”“支付”等,对应 Excel 的“模块”。
- 页面 page:功能页面名称,如“首页”“视频聊天”“会员中心”等,对应“页面”。
- 场景 scenario:具体业务场景,如“刷视频视频”“送礼物”“支付页”等,对应“场景”。
- 事件类型 event_type:view / click / submit / event,对应“事件类型”。
- 事件名 event_name:用于技术侧唯一标识事件,如
app_launch,对应“事件名”。 - 触发 trigger_source:触发方式说明,如“点击”“弹窗展示”等,对应“触发”。
- 事件描述 description:业务方可读描述,对应“事件描述”。
- 说明 remark:补充说明,如“进入登录页面”“发起支付”等,对应“说明”。
- 参数 params:该事件特有的自定义属性,如搜索关键词、礼物ID等,对应“参数”,以 JSON 形式存储。
公共字段(由 SDK/服务统一补齐):
- event_id:全局唯一 ID(如 UUID)。
- user_id:登录用户 ID,游客为空或 0。
- device_id:设备唯一标识。
- session_id:会话 ID。
- platform:平台,如 ios / android / web。
- app_version:客户端版本号。
- os_version:操作系统版本。
- network_type:网络类型:wifi/4g/5g 等。
- channel:渠道,如应用商店渠道标识。
- server_time:服务端接收时间。
- client_time:客户端上报时间。
- env:环境,如 prod / test。
3.2 埋点 SDK 设计
3.2.1 接入方式
-
注解方式(推荐):
- 在 Controller / Service 方法上添加
@BizEvent注解,自动完成埋点。 - 适合固定流程事件(如“进入页面”“点击按钮”“提交表单”)。
- 在 Controller / Service 方法上添加
-
代码 API 方式:
- 适用于复杂逻辑或循环场景,业务代码可主动调用
TrackingClient.track(...)。
- 适用于复杂逻辑或循环场景,业务代码可主动调用
3.2.2 埋点注解设计
示例注解:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface BizEvent {
String module();
String page();
String scenario();
String eventName();
String eventType(); // view / click / submit / event
String description() default "";
String triggerSource() default "";
}
配套 AOP 切面自动采集:
- 从
UserContext中获取userId、deviceId等信息。 - 从请求中解析
platform、appVersion、channel等公共字段。 - 支持配置是否采集入参 / 返回值中的业务字段作为
params。
3.2.3 埋点 SDK API 设计
核心接口:
public interface TrackingClient {
/**
* 异步上报单条事件(默认方式)
*/
void track(AppEvent event);
/**
* 异步批量上报事件
*/
void trackBatch(List<AppEvent> events);
}
事件对象:
public class AppEvent {
private String eventId;
private String module;
private String page;
private String scenario;
private String eventType;
private String eventName;
private String description;
private String triggerSource;
private String remark;
private Long userId;
private String deviceId;
private String sessionId;
private String platform;
private String appVersion;
private String osVersion;
private String networkType;
private String channel;
private Long clientTime;
private Long serverTime;
private String env;
private Map<String, Object> params;
}
3.2.4 SDK 内部代码结构与实现要点
- 包结构示例:
com.video.tracking
├── annotation
│ └── BizEvent.java // 埋点注解定义
├── aspect
│ └── BizEventAspect.java // AOP 切面,实现注解拦截
├── client
│ ├── TrackingClient.java // 埋点客户端接口
│ └── HttpTrackingClient.java// 基于 HTTP 的默认实现
├── buffer
│ └── AsyncEventBuffer.java // 异步缓冲队列与发送线程
├── model
│ └── AppEvent.java // 事件模型
├── config
│ ├── TrackingProperties.java// SDK 配置(批量大小、间隔、重试等)
│ └── TrackingConstants.java // 常量定义
└── autoconfigure
└── TrackingAutoConfiguration.java // Spring Boot 自动装配
-
关键类职责:
- BizEventAspect:拦截标注了
@BizEvent的方法,从注解与上下文中构建AppEvent,交给TrackingClient处理。 - AsyncEventBuffer:在内存中维护无锁队列/高性能队列,提供
offer(AppEvent)、drainTo(List<AppEvent> batch)等方法,并管理后台发送线程生命周期。 - HttpTrackingClient:持有
AsyncEventBuffer,对外暴露track/trackBatch方法,内部将事件放入缓冲队列,由发送线程按批次调用采集服务 HTTP 接口。 - TrackingAutoConfiguration:基于 Spring Boot 自动装配机制,创建默认的
TrackingClient、AsyncEventBuffer、RestTemplate/WebClient等 Bean,并读取配置中心/配置文件中的埋点相关配置。
- BizEventAspect:拦截标注了
-
线程模型与队列策略:
- 使用 无锁队列或 Disruptor / MPSC 队列 缓存事件,避免锁竞争。
- 后台线程周期性(如每 200ms 或条数阈值 100 条)将事件聚合成批量请求发送至采集服务。
- 支持本地内存队列溢出时的降级策略(丢弃 / 写本地文件),并记录关键指标用于监控。
3.2.5 关键类骨架代码示例
以下示例代码仅为 SDK 实现骨架,实际项目可根据需要裁剪与扩展。
BizEventAspect.java
package com.video.tracking.aspect;
import com.video.tracking.annotation.BizEvent;
import com.video.tracking.client.TrackingClient;
import com.video.tracking.model.AppEvent;
import com.video.UserContext;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Aspect
@Component
public class BizEventAspect {
@Autowired
private TrackingClient trackingClient;
@Around("@annotation(com.video.tracking.annotation.BizEvent)")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
long start = System.currentTimeMillis();
Object result = null;
Throwable ex = null;
try {
result = pjp.proceed();
return result;
} catch (Throwable e) {
ex = e;
throw e;
} finally {
try {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
BizEvent bizEvent = method.getAnnotation(BizEvent.class);
if (bizEvent != null) {
AppEvent event = buildEvent(bizEvent);
// 根据需要可以追加 params,如请求参数、返回值等
trackingClient.track(event);
}
} catch (Exception ignore) {
// 埋点失败不影响主流程
}
}
}
private AppEvent buildEvent(BizEvent bizEvent) {
// 这里示例性从 UserContext 取公共字段,具体视项目而定
Long userId = UserContext.getUserId();
String deviceId = UserContext.getDeviceId();
String platform = UserContext.getPlatform();
AppEvent event = new AppEvent();
event.setModule(bizEvent.module());
event.setPage(bizEvent.page());
event.setScenario(bizEvent.scenario());
event.setEventName(bizEvent.eventName());
event.setEventType(bizEvent.eventType());
event.setDescription(bizEvent.description());
event.setTriggerSource(bizEvent.triggerSource());
event.setUserId(userId);
event.setDeviceId(deviceId);
event.setPlatform(platform);
event.setClientTime(System.currentTimeMillis());
return event;
}
}
AsyncEventBuffer.java
package com.video.tracking.buffer;
import com.video.tracking.model.AppEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class AsyncEventBuffer {
private final BlockingQueue<AppEvent> queue;
private final int batchSize;
private final long flushIntervalMillis;
private final Consumer<List<AppEvent>> batchConsumer;
private volatile boolean running = false;
private Thread workerThread;
public AsyncEventBuffer(int capacity,
int batchSize,
long flushIntervalMillis,
Consumer<List<AppEvent>> batchConsumer) {
this.queue = new LinkedBlockingQueue<>(capacity);
this.batchSize = batchSize;
this.flushIntervalMillis = flushIntervalMillis;
this.batchConsumer = batchConsumer;
}
public boolean offer(AppEvent event) {
// 队列满时可视情况返回 false 或直接丢弃
return queue.offer(event);
}
public void start() {
if (running) {
return;
}
running = true;
workerThread = new Thread(this::runLoop, "tracking-buffer-worker");
workerThread.setDaemon(true);
workerThread.start();
}
public void shutdown() {
running = false;
if (workerThread != null) {
workerThread.interrupt();
}
}
private void runLoop() {
List<AppEvent> batch = new ArrayList<>(batchSize);
long lastFlushTime = System.currentTimeMillis();
try {
while (running) {
long timeout = flushIntervalMillis - (System.currentTimeMillis() - lastFlushTime);
if (timeout <= 0) {
flushBatch(batch);
lastFlushTime = System.currentTimeMillis();
continue;
}
AppEvent event = queue.poll(timeout, TimeUnit.MILLISECONDS);
if (event != null) {
batch.add(event);
if (batch.size() >= batchSize) {
flushBatch(batch);
lastFlushTime = System.currentTimeMillis();
}
} else {
// 超时无新事件,也需要刷新已有批次
flushBatch(batch);
lastFlushTime = System.currentTimeMillis();
}
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} finally {
// 退出前尽量刷掉剩余事件
flushBatch(batch);
}
}
private void flushBatch(List<AppEvent> batch) {
if (batch.isEmpty()) {
return;
}
try {
batchConsumer.accept(new ArrayList<>(batch));
} catch (Exception e) {
// 可在此处记录日志,避免影响主线程
} finally {
batch.clear();
}
}
}
HttpTrackingClient.java
package com.video.tracking.client;
import com.video.tracking.buffer.AsyncEventBuffer;
import com.video.tracking.model.AppEvent;
import com.video.tracking.config.TrackingProperties;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class HttpTrackingClient implements TrackingClient {
private final AsyncEventBuffer eventBuffer;
private final RestTemplate restTemplate;
private final TrackingProperties properties;
public HttpTrackingClient(AsyncEventBuffer eventBuffer,
RestTemplate restTemplate,
TrackingProperties properties) {
this.eventBuffer = eventBuffer;
this.restTemplate = restTemplate;
this.properties = properties;
this.eventBuffer.start();
}
@Override
public void track(AppEvent event) {
// 丢弃返回 false 的事件可计数上报,用于监控
eventBuffer.offer(event);
}
@Override
public void trackBatch(List<AppEvent> events) {
if (events == null || events.isEmpty()) {
return;
}
// 直接走批量上报接口
sendBatch(events);
}
private void sendBatch(List<AppEvent> batch) {
String url = properties.getIngestBaseUrl() + "/api/track/v1/events/batch";
Map<String, Object> body = new HashMap<>();
body.put("events", batch);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Map<String, Object>> request = new HttpEntity<>(body, headers);
try {
restTemplate.postForEntity(url, request, String.class);
} catch (Exception e) {
// 网络异常等情况可在此处增加重试 / 降级逻辑
}
}
}
TrackingProperties.java
package com.video.tracking.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "tracking")
public class TrackingProperties {
/** 采集服务基础地址 */
private String ingestBaseUrl;
/** 内存队列容量 */
private int bufferCapacity = 10000;
/** 批量大小 */
private int batchSize = 100;
/** 刷新间隔(毫秒) */
private long flushIntervalMillis = 200L;
// getter / setter 省略
public String getIngestBaseUrl() {
return ingestBaseUrl;
}
public void setIngestBaseUrl(String ingestBaseUrl) {
this.ingestBaseUrl = ingestBaseUrl;
}
public int getBufferCapacity() {
return bufferCapacity;
}
public void setBufferCapacity(int bufferCapacity) {
this.bufferCapacity = bufferCapacity;
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public long getFlushIntervalMillis() {
return flushIntervalMillis;
}
public void setFlushIntervalMillis(long flushIntervalMillis) {
this.flushIntervalMillis = flushIntervalMillis;
}
}
TrackingAutoConfiguration.java
package com.video.tracking.autoconfigure;
import com.video.tracking.buffer.AsyncEventBuffer;
import com.video.tracking.client.HttpTrackingClient;
import com.video.tracking.client.TrackingClient;
import com.video.tracking.config.TrackingProperties;
import com.video.tracking.model.AppEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
import java.util.List;
@Configuration
@EnableConfigurationProperties(TrackingProperties.class)
public class TrackingAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RestTemplate trackingRestTemplate() {
return new RestTemplate();
}
@Bean
@ConditionalOnMissingBean
public AsyncEventBuffer asyncEventBuffer(TrackingProperties properties,
RestTemplate trackingRestTemplate) {
return new AsyncEventBuffer(
properties.getBufferCapacity(),
properties.getBatchSize(),
properties.getFlushIntervalMillis(),
(List<AppEvent> batch) -> {
// 这里留空,由 HttpTrackingClient 通过组合方式复用;
// 实际项目中可改为更灵活的设计
}
);
}
@Bean
@ConditionalOnMissingBean
public TrackingClient trackingClient(AsyncEventBuffer asyncEventBuffer,
RestTemplate trackingRestTemplate,
TrackingProperties properties) {
return new HttpTrackingClient(asyncEventBuffer, trackingRestTemplate, properties);
}
}
3.3 埋点采集服务 API 设计
对外统一提供 HTTP 接口,由内部 SDK 调用:
-
单条上报:
- URL:
POST /api/track/v1/event - Body:
AppEventJSON
- URL:
-
批量上报(推荐):
- URL:
POST /api/track/v1/events/batch - Body:
{"events": [AppEvent, ...]}
- URL:
采集服务职责:
- 校验必填字段(eventName、eventType、module、page、clientTime 等)。
- 补充服务端时间、环境信息。
- 记录接收日志(用于排查丢数问题)。
- 将事件批量写入 MQ 对应 Topic/Queue,如
app_event_log。
4. 数据模型与 ClickHouse 表设计
4.1 逻辑数据模型
统一埋点明细表主字段:
- event_id String
- event_name String
- event_type String
- module String
- page String
- scenario String
- description String
- trigger_source String
- remark String
- user_id UInt64
- device_id String
- session_id String
- platform LowCardinality(String)
- app_version LowCardinality(String)
- os_version LowCardinality(String)
- network_type LowCardinality(String)
- channel LowCardinality(String)
- client_time DateTime
- server_time DateTime
- event_date Date(分区字段,通常由 server_time 提取)
- env LowCardinality(String)
- params String / JSON 字段(建议使用 JSON 字符串 + ClickHouse JSON 相关函数)
4.2 ClickHouse 明细表设计
示例建表语句(本地表):
CREATE TABLE IF NOT EXISTS app_event_log_local (
event_date Date DEFAULT toDate(server_time),
server_time DateTime DEFAULT now(),
client_time DateTime,
event_id String,
event_name String,
event_type LowCardinality(String),
module LowCardinality(String),
page LowCardinality(String),
scenario LowCardinality(String),
description String,
trigger_source LowCardinality(String),
remark String,
user_id UInt64,
device_id String,
session_id String,
platform LowCardinality(String),
app_version LowCardinality(String),
os_version LowCardinality(String),
network_type LowCardinality(String),
channel LowCardinality(String),
env LowCardinality(String),
params String
)
ENGINE = MergeTree
PARTITION BY event_date
ORDER BY (event_name, event_date, user_id, device_id, server_time)
SETTINGS index_granularity = 8192;
如需高可用,可使用 ReplicatedMergeTree + 分布式表:
CREATE TABLE app_event_log ON CLUSTER cluster_1 AS app_event_log_local
ENGINE = Distributed(cluster_1, default, app_event_log_local, rand());
4.3 预聚合统计表
针对高频查询场景设计物化视图和聚合表,例如:
- 日活表
app_event_dau:按日期、平台、渠道统计 UV。 - 漏斗转化表
app_funnel_stats:首屏 → 视频聊天 → 支付 等关键步骤转化率。 - 行为次数表
app_event_count:按模块/事件名统计次数。
示例:UV 统计物化视图:
CREATE TABLE app_dau_local (
dt Date,
platform LowCardinality(String),
channel LowCardinality(String),
uv UInt64
)
ENGINE = AggregatingMergeTree
PARTITION BY dt
ORDER BY (dt, platform, channel);
CREATE MATERIALIZED VIEW mv_app_dau_local
TO app_dau_local AS
SELECT
event_date AS dt,
platform,
channel,
uniqState(user_id) AS uv
FROM app_event_log_local
GROUP BY dt, platform, channel;
4.4 ClickHouse 建表与物化视图清单
为便于工程落地,整理本系统需要的主要 ClickHouse 表与物化视图示例,可按业务需要精简或扩展。
-
1)埋点明细表(本地表)
app_event_log_local- 结构定义见 4.2 节示例,建议在生产环境中使用
ReplicatedMergeTree以支持多副本与故障恢复。
- 结构定义见 4.2 节示例,建议在生产环境中使用
-
2)埋点明细分布式表
app_event_log
CREATE TABLE IF NOT EXISTS app_event_log ON CLUSTER cluster_1 AS app_event_log_local
ENGINE = Distributed(
cluster_1,
default,
app_event_log_local,
cityHash64(event_date, event_name)
);
- 3)日活统计表
app_dau_local与物化视图mv_app_dau_local- 用于按日期、平台、渠道统计 UV(已在 4.3 节给出示例)。
CREATE TABLE IF NOT EXISTS app_dau_local (
dt Date,
platform LowCardinality(String),
channel LowCardinality(String),
uv_state AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY dt
ORDER BY (dt, platform, channel);
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_app_dau_local
TO app_dau_local AS
SELECT
event_date AS dt,
platform,
channel,
uniqState(user_id) AS uv_state
FROM app_event_log_local
GROUP BY dt, platform, channel;
- 4)事件次数统计表
app_event_count_local与物化视图mv_app_event_count_local- 用于统计各模块/页面/事件的 PV、点击量等。
CREATE TABLE IF NOT EXISTS app_event_count_local (
event_date Date,
module LowCardinality(String),
page LowCardinality(String),
event_name LowCardinality(String),
event_type LowCardinality(String),
cnt UInt64
)
ENGINE = SummingMergeTree
PARTITION BY event_date
ORDER BY (event_date, module, page, event_name, event_type);
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_app_event_count_local
TO app_event_count_local AS
SELECT
event_date,
module,
page,
event_name,
event_type,
count() AS cnt
FROM app_event_log_local
GROUP BY event_date, module, page, event_name, event_type;
- 5)漏斗统计表
app_funnel_stats_local与物化视图mv_app_funnel_stats_local- 以漏斗名称 + 步骤标识为维度,统计每一步的 UV,用于分析“首屏 → 视频聊天 → 支付”等关键步骤转化情况。
CREATE TABLE IF NOT EXISTS app_funnel_stats_local (
dt Date,
funnel_name LowCardinality(String),
step_order UInt8,
step_name LowCardinality(String),
uv_state AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY dt
ORDER BY (dt, funnel_name, step_order);
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_app_funnel_stats_local
TO app_funnel_stats_local AS
SELECT
event_date AS dt,
/* 漏斗名称与步骤依赖离线配置表或注释约定,这里仅示意 */
'video_chat_pay_funnel' AS funnel_name,
multiIf(
event_name = 'app_launch', 1,
event_name = 'video_chat_enter', 2,
event_name = 'pay_success', 3,
0
) AS step_order,
multiIf(
step_order = 1, '启动',
step_order = 2, '进入视频聊天',
step_order = 3, '支付成功',
'其他'
) AS step_name,
uniqState(user_id) AS uv_state
FROM app_event_log_local
WHERE event_name IN ('app_launch', 'video_chat_enter', 'pay_success')
GROUP BY dt, funnel_name, step_order, step_name;
- 6)留存统计表
app_retention_stats_local与物化视图mv_app_retention_stats_local(可选)- 用于统计 D+1、D+7 等留存情况,此处给出简化示例,实际工程中可通过离线任务生成。
CREATE TABLE IF NOT EXISTS app_retention_stats_local (
first_visit_date Date,
retention_day UInt16, -- 1, 7, 30 等
platform LowCardinality(String),
channel LowCardinality(String),
uv_state AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY first_visit_date
ORDER BY (first_visit_date, retention_day, platform, channel);
说明:留存通常依赖“首次访问日期 + 后续活跃行为”的多表关联计算,可由离线任务或流式任务将结果写入
app_retention_stats_local,不必强行通过单一物化视图完成。
4.5 典型分析查询示例
下面给出若干常见分析场景的 ClickHouse 查询示例,便于运营/数据开发直接复用:
- 查询指定日期范围内的 DAU(按平台、渠道)
SELECT
dt,
platform,
channel,
sum(uniqMerge(uv_state)) AS dau
FROM app_dau_local
WHERE dt BETWEEN toDate('2025-01-01') AND toDate('2025-01-07')
GROUP BY dt, platform, channel
ORDER BY dt, platform, channel;
- 查询某一渠道在某日的视频聊天漏斗转化情况
SELECT
dt,
funnel_name,
step_order,
step_name,
uniqMerge(uv_state) AS uv
FROM app_funnel_stats_local
WHERE dt = toDate('2025-01-01')
AND funnel_name = 'video_chat_pay_funnel'
AND channel = 'appstore'
GROUP BY dt, funnel_name, step_order, step_name
ORDER BY step_order;
说明:如需支持按渠道区分,可在
app_funnel_stats_local中增加channel字段并在 MV 中补充。
- 查询某模块/页面/事件的 PV 变化趋势(近 7 天)
SELECT
event_date,
module,
page,
event_name,
sum(cnt) AS pv
FROM app_event_count_local
WHERE event_date >= today() - 7
AND module = '首页'
AND page = '首页'
AND event_name = 'app_launch'
GROUP BY event_date, module, page, event_name
ORDER BY event_date;
- 基于明细表按用户维度统计“视频聊天”相关行为次数
SELECT
event_date,
user_id,
countIf(event_name = 'video_chat_enter') AS enter_cnt,
countIf(event_name = 'video_chat_message') AS msg_cnt
FROM app_event_log_local
WHERE event_date = today() - 1
AND module = '视频'
GROUP BY event_date, user_id
ORDER BY msg_cnt DESC
LIMIT 100;
- 按版本对比某日关键转化事件(如支付成功)分布
SELECT
event_date,
app_version,
count() AS pay_success_cnt
FROM app_event_log_local
WHERE event_date = toDate('2025-01-01')
AND event_name = 'pay_success'
GROUP BY event_date, app_version
ORDER BY pay_success_cnt DESC;
5. 数据采集与传输流程
5.1 典型链路
- 业务代码触发:用户在 APP 内触发某事件(如首页“立即开始聊天按钮”点击)。
- SDK 采集:
- AOP/显式调用生成
AppEvent对象,填入对应字段(module=首页, page=首页, eventName=app_start_chat 等)。 - 将事件放入 SDK 内部内存队列。
- AOP/显式调用生成
- SDK 批量上报:后台线程按「数量阈值 + 时间窗口」策略批量调用
/events/batch接口。 - 采集服务接收:
- 校验参数、补齐公共字段。
- 将事件批量发送至 MQ(如 RabbitMQ 交换机
exchange.app.event)。
- 消费者服务写库:
- 从 MQ 批量拉取,按批量 INSERT 写入 ClickHouse
app_event_log表。 - 对写入失败的批次记录日志并重试,必要时写入失败队列或本地文件。
- 从 MQ 批量拉取,按批量 INSERT 写入 ClickHouse
- 分析与查询:
- 物化视图自动增量刷新统计表。
- 报表/BI/运营系统通过分析 API 查询 ClickHouse。
5.2 与现有 RabbitMQ 的集成
- 复用项目中已有的
RabbitMQConfig配置,新建埋点交换机、队列:- 交换机:
exchange.app.event - 队列:
queue.app.event.log - 路由键:按环境或业务模块区分,如
event.prod.*。
- 交换机:
- 采集服务作为生产者,埋点消费者服务作为消费者,两者均基于 Spring Boot AMQP 实现。
6. 数据分析与统计功能设计
6.1 用户行为分析
- 路径分析:从启动 → 首页 → 搜索 → 视频 → 支付的完整行为链路。
- 漏斗分析:如“视频开始 → 对话额度弹窗展示 → 开通会员”的转化率。
- 留存分析:按首次访问日期统计次日/7 日/30 日留存。
- 分群分析:按渠道、平台、版本、用户属性维度切分行为数据。
6.2 业务指标统计
结合 Excel 中的关键事件设计指标:
- 启动相关指标:
app_launch次数、冷启动时长分布。 - 视频相关指标:视频播放完成人数、对话数量、人均对话轮数、视频结算完成人数等。
- AI 朋友相关指标:AI 朋友关注率、聊天频次、照片/视频查看次数、分享行为。
- 礼物系统指标:送礼人数、送礼次数、人均送礼金额、命定礼物分享转化率。
- 会员/支付指标:会员开通率、会员续费率、支付成功率、支付失败原因分布。
6.3 查询与报表实现
- 通过 ClickHouse SQL 或 BI 工具(如 Superset/Grafana)配置固定报表。
- 对接内部运营平台:
- 提供 REST API(如
/api/metrics/dau、/api/metrics/funnel)。 - 查询参数包括日期范围、平台、渠道、版本、用户属性等。
- 提供 REST API(如
7. 性能优化与可靠性策略
7.1 对业务代码的影响最小化
-
非阻塞异步上报:
- SDK 将埋点写入内存队列后立即返回,不阻塞业务线程。
- QPS 较高时,严格控制单次入队耗时(如不做复杂序列化和 IO)。
-
批量发送:
- 按条数阈值(如 100 条)和时间阈值(如 200ms)组合触发。
- 支持按模块/事件类型分批,避免单批过大影响网络抖动。
-
轻量对象与序列化:
- 事件对象采用轻量字段类型;
- 使用高效 JSON 序列化库(如 Jackson 配置复用、禁用反射开销大的特性)。
7.2 传输与写入性能
-
HTTP 层优化:
- 采集服务支持 Keep-Alive、连接池、压缩(如 gzip)。
- 请求/响应体格式稳定,避免频繁变更导致客户端升级成本上升。
-
MQ 层优化:
- 批量确认、合理配置 prefetch count。
- 使用独立埋点队列,保障与业务消息隔离。
-
ClickHouse 写入优化:
- 优先使用批量 INSERT(如每批 1000~5000 条)。
- 根据查询模式合理设计
ORDER BY和分区键。 - 使用
LowCardinality减少存储和内存占用。
7.3 降级与容错
-
SDK 降级:
- 内存队列满时,可按策略丢弃新增事件或仅保留关键事件。
- 网络异常时,短时间内重试;超过阈值后进入静默期,防止雪崩。
-
采集服务降级:
- 扩容为多实例部署,前接网关或负载均衡。
- 临时关闭非关键校验以提升吞吐。
-
消费者与 ClickHouse 降级:
- 写入失败时,按批记录到失败队列或本地备份表,用于补数。
- 出现严重问题时,临时停止消费,优先保障业务写 MQ 能力。
8. 监控与告警设计
8.1 监控指标
-
SDK 侧:
- 队列长度、丢弃事件数量。
- 上报 QPS、成功率、平均耗时。
-
采集服务:
- 请求 QPS、耗时分布、非 2xx 比例。
- 入 MQ 成功率、异常日志量。
-
MQ 层:
- 队列消息堆积量、延迟时间。
- 消费速率与生产速率差值。
-
消费者 & ClickHouse:
- 批量写入耗时、失败次数、重试次数。
- ClickHouse 节点 CPU、内存、磁盘、查询耗时、拒绝率。
-
分析 API:
- 接口 QPS、响应时间、错误率。
8.2 告警策略
- 为上述指标设置阈值与告警规则,例如:
- SDK 丢弃率 > 0.1% 持续 5 分钟。
- 采集服务错误率 > 1%。
- MQ 队列堆积超过 N 万条或延迟超过 5 分钟。
- ClickHouse 写入失败批次数连续 N 次。
- 告警渠道:邮件 + IM 群(如企业微信 / 飞书)+ 值班短信。
- 告警分级:P0(影响核心数据可靠性)、P1(影响部分统计)、P2(预警)。
9. 与现有业务系统的集成方案
9.1 系统现状简述
- 项目基于 Spring Boot 构建,已集成 Redis、RabbitMQ、MyBatis 等组件。
- 业务模块涵盖:视频、视频、礼物系统、会员系统、私信系统等。
9.2 集成方式
-
新增埋点模块:
- 在后端工程中新建
tracking模块(包名如com.video.tracking),包含:- 注解定义与 AOP 切面。
TrackingClient接口及默认实现。- 采集服务客户端(调用
/api/track/v1/events/batch)。
- 在后端工程中新建
-
与现有过滤器/拦截器集成:
- 在已有登录 / 鉴权过滤器中,统一抽取
userId、deviceId、platform等信息存入UserContext。 - 埋点 AOP 和业务代码从
UserContext读取公共字段,保证一致性。
- 在已有登录 / 鉴权过滤器中,统一抽取
-
RabbitMQ 配置复用:
- 通过现有
RabbitMQConfig新增埋点队列和交换机配置,避免重复连接管理。
- 通过现有
-
ClickHouse 集成:
- 新增 ClickHouse 数据源配置,用于埋点消费者服务和分析服务访问。
- 可独立为一个 Spring Boot 服务,与主业务服务解耦,避免影响核心交易链路。
-
前后端协议约定:
- 对于前端直接埋点(如 H5),统一使用同一 HTTP 协议字段格式,必要时通过网关转发至埋点采集服务。
9.3 接入步骤示例
- 搭建 ClickHouse 集群并创建埋点表结构。
- 新增埋点采集服务(可作为独立 Spring Boot 应用部署)。
- 在主业务服务中引入埋点 SDK 模块,并完成配置(采集服务地址、批量阈值等)。
- 对关键业务流程加注
@BizEvent注解:- 启动 / 首页曝光;
- 视频开始/结束、对话发送、AI 回复;
- 礼物送出、钻石充值;
- 会员开通/续费、支付成功/失败等。
- 搭建埋点消费者服务,配置 MQ 消费组,写入 ClickHouse。
- 搭建监控与告警,接入现有监控系统(如 Prometheus + Grafana)。
- 和产品/运营协同验证数据,确保事件数、统计口径与预期一致。
10. 扩展与演进规划
-
多租户/多应用支持:
- 在事件模型中增加
app_id/tenant_id字段,支持多个 APP 共用一套埋点平台。
- 在事件模型中增加
-
隐私与合规:
- 对用户敏感数据进行脱敏或不入库,确保符合隐私政策与用户协议。
- 结合现有“通用数据脱敏设计方案”与“隐私政策与用户协议管理系统设计”。
-
智能分析:
- 后续可基于埋点数据进行用户分群、推荐及 A/B 实验等高级分析。
-
数据回流业务:
- 将分析结果回流业务系统,用于精细化运营和个性化推荐,如:对高价值用户推送特定视频或会员优惠。
本设计文档为后端埋点系统的总体方案,后续可根据实际实现情况补充更详细的接口定义、表结构变更记录及运维手册。
更多推荐


所有评论(0)