1. 背景与目标

  • 背景:当前缺乏系统化的业务埋点体系,无法对用户行为和业务指标进行精细化分析。项目已提供埋点规范文件,定义了主要模块、页面、场景及事件。
  • 目标:在不显著影响业务性能的前提下,构建一套基于 Spring Boot 的统一埋点系统,实现从 埋点采集 → 传输 → 存储 → 分析 → 监控告警 的完整闭环,并以 ClickHouse 作为核心 OLAP 存储。

2. 系统整体架构设计

2.1 功能模块
  • 埋点 SDK(Server-Side SDK)

    • 提供注解 + API 两种接入方式。
    • 屏蔽埋点协议、批量发送、重试等复杂细节。
    • 支持异步、缓冲队列和本地降级策略。
  • 埋点采集服务(Ingestion API)

    • 提供 HTTP 接口接收埋点事件(单条 / 批量)。
    • 做基础校验、补充公共字段、落消息队列或本地缓冲。
  • 消息队列层(RabbitMQ/Kafka)

    • 解耦业务系统与存储,防止 ClickHouse 抖动反向影响业务。
    • 支持批量消费、重试队列和死信队列。
  • 埋点消费者服务(Consumer)

    • 从 MQ 批量拉取事件,写入 ClickHouse 明细表。
    • 支持批量 INSERT 与失败重试、告警。
  • ClickHouse 集群

    • 存储明细埋点数据 + 预聚合统计数据。
    • 提供高并发 OLAP 查询能力。
  • 分析与查询服务

    • 业务指标 API:UV、转化率、漏斗、留存等。
    • 提供运营 / 产品侧可视化对接(BI、大屏)。
  • 监控与告警

    • 埋点链路健康监控:QPS、失败率、MQ 堆积、ClickHouse 写入错误等。
    • 告警分级推送(邮件 / IM / 短信)。
2.2 系统架构图(Mermaid)

移动端APP / H5

业务服务内埋点SDK

埋点采集服务(HTTP批量上报)

消息队列(RabbitMQ/Kafka)

埋点消费者服务

ClickHouse集群

实时分析(物化视图/流式任务)

离线分析(SQL/报表)

分析查询API服务

BI可视化/运营后台

监控&告警(Prometheus/日志系统)

3. 埋点规范与 API 设计

3.1 埋点事件规范

基于比如某个app业务系统中中定义的字段,统一抽象为如下逻辑字段:

  • 模块 module:如“首页”“视频”“AI朋友”“礼物系统”“个人中心”“支付”等,对应 Excel 的“模块”。
  • 页面 page:功能页面名称,如“首页”“视频聊天”“会员中心”等,对应“页面”。
  • 场景 scenario:具体业务场景,如“刷视频视频”“送礼物”“支付页”等,对应“场景”。
  • 事件类型 event_type:view / click / submit / event,对应“事件类型”。
  • 事件名 event_name:用于技术侧唯一标识事件,如 app_launch,对应“事件名”。
  • 触发 trigger_source:触发方式说明,如“点击”“弹窗展示”等,对应“触发”。
  • 事件描述 description:业务方可读描述,对应“事件描述”。
  • 说明 remark:补充说明,如“进入登录页面”“发起支付”等,对应“说明”。
  • 参数 params:该事件特有的自定义属性,如搜索关键词、礼物ID等,对应“参数”,以 JSON 形式存储。

公共字段(由 SDK/服务统一补齐):

  • event_id:全局唯一 ID(如 UUID)。
  • user_id:登录用户 ID,游客为空或 0。
  • device_id:设备唯一标识。
  • session_id:会话 ID。
  • platform:平台,如 ios / android / web。
  • app_version:客户端版本号。
  • os_version:操作系统版本。
  • network_type:网络类型:wifi/4g/5g 等。
  • channel:渠道,如应用商店渠道标识。
  • server_time:服务端接收时间。
  • client_time:客户端上报时间。
  • env:环境,如 prod / test。
3.2 埋点 SDK 设计
3.2.1 接入方式
  • 注解方式(推荐)

    • 在 Controller / Service 方法上添加 @BizEvent 注解,自动完成埋点。
    • 适合固定流程事件(如“进入页面”“点击按钮”“提交表单”)。
  • 代码 API 方式

    • 适用于复杂逻辑或循环场景,业务代码可主动调用 TrackingClient.track(...)
3.2.2 埋点注解设计

示例注解:

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface BizEvent {
    String module();
    String page();
    String scenario();
    String eventName();
    String eventType();   // view / click / submit / event
    String description() default "";
    String triggerSource() default "";
}

配套 AOP 切面自动采集:

  • UserContext 中获取 userIddeviceId 等信息。
  • 从请求中解析 platformappVersionchannel 等公共字段。
  • 支持配置是否采集入参 / 返回值中的业务字段作为 params
3.2.3 埋点 SDK API 设计

核心接口:

public interface TrackingClient {

    /**
     * 异步上报单条事件(默认方式)
     */
    void track(AppEvent event);

    /**
     * 异步批量上报事件
     */
    void trackBatch(List<AppEvent> events);
}

事件对象:

public class AppEvent {
    private String eventId;
    private String module;
    private String page;
    private String scenario;
    private String eventType;
    private String eventName;
    private String description;
    private String triggerSource;
    private String remark;

    private Long userId;
    private String deviceId;
    private String sessionId;
    private String platform;
    private String appVersion;
    private String osVersion;
    private String networkType;
    private String channel;

    private Long clientTime;
    private Long serverTime;
    private String env;

    private Map<String, Object> params;
}
3.2.4 SDK 内部代码结构与实现要点
  • 包结构示例:
com.video.tracking
  ├── annotation
  │   └── BizEvent.java          // 埋点注解定义
  ├── aspect
  │   └── BizEventAspect.java    // AOP 切面,实现注解拦截
  ├── client
  │   ├── TrackingClient.java    // 埋点客户端接口
  │   └── HttpTrackingClient.java// 基于 HTTP 的默认实现
  ├── buffer
  │   └── AsyncEventBuffer.java  // 异步缓冲队列与发送线程
  ├── model
  │   └── AppEvent.java          // 事件模型
  ├── config
  │   ├── TrackingProperties.java// SDK 配置(批量大小、间隔、重试等)
  │   └── TrackingConstants.java // 常量定义
  └── autoconfigure
      └── TrackingAutoConfiguration.java // Spring Boot 自动装配
  • 关键类职责:

    • BizEventAspect:拦截标注了 @BizEvent 的方法,从注解与上下文中构建 AppEvent,交给 TrackingClient 处理。
    • AsyncEventBuffer:在内存中维护无锁队列/高性能队列,提供 offer(AppEvent)drainTo(List<AppEvent> batch) 等方法,并管理后台发送线程生命周期。
    • HttpTrackingClient:持有 AsyncEventBuffer,对外暴露 track / trackBatch 方法,内部将事件放入缓冲队列,由发送线程按批次调用采集服务 HTTP 接口。
    • TrackingAutoConfiguration:基于 Spring Boot 自动装配机制,创建默认的 TrackingClientAsyncEventBufferRestTemplate/WebClient 等 Bean,并读取配置中心/配置文件中的埋点相关配置。
  • 线程模型与队列策略:

    • 使用 无锁队列或 Disruptor / MPSC 队列 缓存事件,避免锁竞争。
    • 后台线程周期性(如每 200ms 或条数阈值 100 条)将事件聚合成批量请求发送至采集服务。
    • 支持本地内存队列溢出时的降级策略(丢弃 / 写本地文件),并记录关键指标用于监控。
3.2.5 关键类骨架代码示例

以下示例代码仅为 SDK 实现骨架,实际项目可根据需要裁剪与扩展。

BizEventAspect.java

package com.video.tracking.aspect;

import com.video.tracking.annotation.BizEvent;
import com.video.tracking.client.TrackingClient;
import com.video.tracking.model.AppEvent;
import com.video.UserContext;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

@Aspect
@Component
public class BizEventAspect {

    @Autowired
    private TrackingClient trackingClient;

    @Around("@annotation(com.video.tracking.annotation.BizEvent)")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {
        long start = System.currentTimeMillis();
        Object result = null;
        Throwable ex = null;
        try {
            result = pjp.proceed();
            return result;
        } catch (Throwable e) {
            ex = e;
            throw e;
        } finally {
            try {
                MethodSignature signature = (MethodSignature) pjp.getSignature();
                Method method = signature.getMethod();
                BizEvent bizEvent = method.getAnnotation(BizEvent.class);
                if (bizEvent != null) {
                    AppEvent event = buildEvent(bizEvent);
                    // 根据需要可以追加 params,如请求参数、返回值等
                    trackingClient.track(event);
                }
            } catch (Exception ignore) {
                // 埋点失败不影响主流程
            }
        }
    }

    private AppEvent buildEvent(BizEvent bizEvent) {
        // 这里示例性从 UserContext 取公共字段,具体视项目而定
        Long userId = UserContext.getUserId();
        String deviceId = UserContext.getDeviceId();
        String platform = UserContext.getPlatform();

        AppEvent event = new AppEvent();
        event.setModule(bizEvent.module());
        event.setPage(bizEvent.page());
        event.setScenario(bizEvent.scenario());
        event.setEventName(bizEvent.eventName());
        event.setEventType(bizEvent.eventType());
        event.setDescription(bizEvent.description());
        event.setTriggerSource(bizEvent.triggerSource());
        event.setUserId(userId);
        event.setDeviceId(deviceId);
        event.setPlatform(platform);
        event.setClientTime(System.currentTimeMillis());
        return event;
    }
}

AsyncEventBuffer.java

package com.video.tracking.buffer;

import com.video.tracking.model.AppEvent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class AsyncEventBuffer {

    private final BlockingQueue<AppEvent> queue;
    private final int batchSize;
    private final long flushIntervalMillis;
    private final Consumer<List<AppEvent>> batchConsumer;

    private volatile boolean running = false;
    private Thread workerThread;

    public AsyncEventBuffer(int capacity,
                            int batchSize,
                            long flushIntervalMillis,
                            Consumer<List<AppEvent>> batchConsumer) {
        this.queue = new LinkedBlockingQueue<>(capacity);
        this.batchSize = batchSize;
        this.flushIntervalMillis = flushIntervalMillis;
        this.batchConsumer = batchConsumer;
    }

    public boolean offer(AppEvent event) {
        // 队列满时可视情况返回 false 或直接丢弃
        return queue.offer(event);
    }

    public void start() {
        if (running) {
            return;
        }
        running = true;
        workerThread = new Thread(this::runLoop, "tracking-buffer-worker");
        workerThread.setDaemon(true);
        workerThread.start();
    }

    public void shutdown() {
        running = false;
        if (workerThread != null) {
            workerThread.interrupt();
        }
    }

    private void runLoop() {
        List<AppEvent> batch = new ArrayList<>(batchSize);
        long lastFlushTime = System.currentTimeMillis();
        try {
            while (running) {
                long timeout = flushIntervalMillis - (System.currentTimeMillis() - lastFlushTime);
                if (timeout <= 0) {
                    flushBatch(batch);
                    lastFlushTime = System.currentTimeMillis();
                    continue;
                }
                AppEvent event = queue.poll(timeout, TimeUnit.MILLISECONDS);
                if (event != null) {
                    batch.add(event);
                    if (batch.size() >= batchSize) {
                        flushBatch(batch);
                        lastFlushTime = System.currentTimeMillis();
                    }
                } else {
                    // 超时无新事件,也需要刷新已有批次
                    flushBatch(batch);
                    lastFlushTime = System.currentTimeMillis();
                }
            }
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        } finally {
            // 退出前尽量刷掉剩余事件
            flushBatch(batch);
        }
    }

    private void flushBatch(List<AppEvent> batch) {
        if (batch.isEmpty()) {
            return;
        }
        try {
            batchConsumer.accept(new ArrayList<>(batch));
        } catch (Exception e) {
            // 可在此处记录日志,避免影响主线程
        } finally {
            batch.clear();
        }
    }
}

HttpTrackingClient.java

package com.video.tracking.client;

import com.video.tracking.buffer.AsyncEventBuffer;
import com.video.tracking.model.AppEvent;
import com.video.tracking.config.TrackingProperties;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.web.client.RestTemplate;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class HttpTrackingClient implements TrackingClient {

    private final AsyncEventBuffer eventBuffer;
    private final RestTemplate restTemplate;
    private final TrackingProperties properties;

    public HttpTrackingClient(AsyncEventBuffer eventBuffer,
                              RestTemplate restTemplate,
                              TrackingProperties properties) {
        this.eventBuffer = eventBuffer;
        this.restTemplate = restTemplate;
        this.properties = properties;
        this.eventBuffer.start();
    }

    @Override
    public void track(AppEvent event) {
        // 丢弃返回 false 的事件可计数上报,用于监控
        eventBuffer.offer(event);
    }

    @Override
    public void trackBatch(List<AppEvent> events) {
        if (events == null || events.isEmpty()) {
            return;
        }
        // 直接走批量上报接口
        sendBatch(events);
    }

    private void sendBatch(List<AppEvent> batch) {
        String url = properties.getIngestBaseUrl() + "/api/track/v1/events/batch";
        Map<String, Object> body = new HashMap<>();
        body.put("events", batch);

        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        HttpEntity<Map<String, Object>> request = new HttpEntity<>(body, headers);

        try {
            restTemplate.postForEntity(url, request, String.class);
        } catch (Exception e) {
            // 网络异常等情况可在此处增加重试 / 降级逻辑
        }
    }
}

TrackingProperties.java

package com.video.tracking.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "tracking")
public class TrackingProperties {

    /** 采集服务基础地址 */
    private String ingestBaseUrl;

    /** 内存队列容量 */
    private int bufferCapacity = 10000;

    /** 批量大小 */
    private int batchSize = 100;

    /** 刷新间隔(毫秒) */
    private long flushIntervalMillis = 200L;

    // getter / setter 省略

    public String getIngestBaseUrl() {
        return ingestBaseUrl;
    }

    public void setIngestBaseUrl(String ingestBaseUrl) {
        this.ingestBaseUrl = ingestBaseUrl;
    }

    public int getBufferCapacity() {
        return bufferCapacity;
    }

    public void setBufferCapacity(int bufferCapacity) {
        this.bufferCapacity = bufferCapacity;
    }

    public int getBatchSize() {
        return batchSize;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public long getFlushIntervalMillis() {
        return flushIntervalMillis;
    }

    public void setFlushIntervalMillis(long flushIntervalMillis) {
        this.flushIntervalMillis = flushIntervalMillis;
    }
}

TrackingAutoConfiguration.java

package com.video.tracking.autoconfigure;

import com.video.tracking.buffer.AsyncEventBuffer;
import com.video.tracking.client.HttpTrackingClient;
import com.video.tracking.client.TrackingClient;
import com.video.tracking.config.TrackingProperties;
import com.video.tracking.model.AppEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

import java.util.List;

@Configuration
@EnableConfigurationProperties(TrackingProperties.class)
public class TrackingAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public RestTemplate trackingRestTemplate() {
        return new RestTemplate();
    }

    @Bean
    @ConditionalOnMissingBean
    public AsyncEventBuffer asyncEventBuffer(TrackingProperties properties,
                                             RestTemplate trackingRestTemplate) {
        return new AsyncEventBuffer(
                properties.getBufferCapacity(),
                properties.getBatchSize(),
                properties.getFlushIntervalMillis(),
                (List<AppEvent> batch) -> {
                    // 这里留空,由 HttpTrackingClient 通过组合方式复用;
                    // 实际项目中可改为更灵活的设计
                }
        );
    }

    @Bean
    @ConditionalOnMissingBean
    public TrackingClient trackingClient(AsyncEventBuffer asyncEventBuffer,
                                         RestTemplate trackingRestTemplate,
                                         TrackingProperties properties) {
        return new HttpTrackingClient(asyncEventBuffer, trackingRestTemplate, properties);
    }
}
3.3 埋点采集服务 API 设计

对外统一提供 HTTP 接口,由内部 SDK 调用:

  • 单条上报

    • URLPOST /api/track/v1/event
    • BodyAppEvent JSON
  • 批量上报(推荐)

    • URLPOST /api/track/v1/events/batch
    • Body{"events": [AppEvent, ...]}

采集服务职责:

  • 校验必填字段(eventName、eventType、module、page、clientTime 等)。
  • 补充服务端时间、环境信息。
  • 记录接收日志(用于排查丢数问题)。
  • 将事件批量写入 MQ 对应 Topic/Queue,如 app_event_log

4. 数据模型与 ClickHouse 表设计

4.1 逻辑数据模型

统一埋点明细表主字段:

  • event_id String
  • event_name String
  • event_type String
  • module String
  • page String
  • scenario String
  • description String
  • trigger_source String
  • remark String
  • user_id UInt64
  • device_id String
  • session_id String
  • platform LowCardinality(String)
  • app_version LowCardinality(String)
  • os_version LowCardinality(String)
  • network_type LowCardinality(String)
  • channel LowCardinality(String)
  • client_time DateTime
  • server_time DateTime
  • event_date Date(分区字段,通常由 server_time 提取)
  • env LowCardinality(String)
  • params String / JSON 字段(建议使用 JSON 字符串 + ClickHouse JSON 相关函数)
4.2 ClickHouse 明细表设计

示例建表语句(本地表):

CREATE TABLE IF NOT EXISTS app_event_log_local (
    event_date       Date            DEFAULT toDate(server_time),
    server_time      DateTime        DEFAULT now(),
    client_time      DateTime,

    event_id         String,
    event_name       String,
    event_type       LowCardinality(String),
    module           LowCardinality(String),
    page             LowCardinality(String),
    scenario         LowCardinality(String),
    description      String,
    trigger_source   LowCardinality(String),
    remark           String,

    user_id          UInt64,
    device_id        String,
    session_id       String,
    platform         LowCardinality(String),
    app_version      LowCardinality(String),
    os_version       LowCardinality(String),
    network_type     LowCardinality(String),
    channel          LowCardinality(String),
    env              LowCardinality(String),

    params           String
)
ENGINE = MergeTree
PARTITION BY event_date
ORDER BY (event_name, event_date, user_id, device_id, server_time)
SETTINGS index_granularity = 8192;

如需高可用,可使用 ReplicatedMergeTree + 分布式表:

CREATE TABLE app_event_log ON CLUSTER cluster_1 AS app_event_log_local
ENGINE = Distributed(cluster_1, default, app_event_log_local, rand());
4.3 预聚合统计表

针对高频查询场景设计物化视图和聚合表,例如:

  • 日活表 app_event_dau:按日期、平台、渠道统计 UV。
  • 漏斗转化表 app_funnel_stats:首屏 → 视频聊天 → 支付 等关键步骤转化率。
  • 行为次数表 app_event_count:按模块/事件名统计次数。

示例:UV 统计物化视图:

CREATE TABLE app_dau_local (
    dt         Date,
    platform   LowCardinality(String),
    channel    LowCardinality(String),
    uv         UInt64
)
ENGINE = AggregatingMergeTree
PARTITION BY dt
ORDER BY (dt, platform, channel);

CREATE MATERIALIZED VIEW mv_app_dau_local
TO app_dau_local AS
SELECT
    event_date AS dt,
    platform,
    channel,
    uniqState(user_id) AS uv
FROM app_event_log_local
GROUP BY dt, platform, channel;
4.4 ClickHouse 建表与物化视图清单

为便于工程落地,整理本系统需要的主要 ClickHouse 表与物化视图示例,可按业务需要精简或扩展。

  • 1)埋点明细表(本地表) app_event_log_local

    • 结构定义见 4.2 节示例,建议在生产环境中使用 ReplicatedMergeTree 以支持多副本与故障恢复。
  • 2)埋点明细分布式表 app_event_log

CREATE TABLE IF NOT EXISTS app_event_log ON CLUSTER cluster_1 AS app_event_log_local
ENGINE = Distributed(
    cluster_1,
    default,
    app_event_log_local,
    cityHash64(event_date, event_name)
);
  • 3)日活统计表 app_dau_local 与物化视图 mv_app_dau_local
    • 用于按日期、平台、渠道统计 UV(已在 4.3 节给出示例)。
CREATE TABLE IF NOT EXISTS app_dau_local (
    dt         Date,
    platform   LowCardinality(String),
    channel    LowCardinality(String),
    uv_state   AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY dt
ORDER BY (dt, platform, channel);

CREATE MATERIALIZED VIEW IF NOT EXISTS mv_app_dau_local
TO app_dau_local AS
SELECT
    event_date AS dt,
    platform,
    channel,
    uniqState(user_id) AS uv_state
FROM app_event_log_local
GROUP BY dt, platform, channel;
  • 4)事件次数统计表 app_event_count_local 与物化视图 mv_app_event_count_local
    • 用于统计各模块/页面/事件的 PV、点击量等。
CREATE TABLE IF NOT EXISTS app_event_count_local (
    event_date   Date,
    module       LowCardinality(String),
    page         LowCardinality(String),
    event_name   LowCardinality(String),
    event_type   LowCardinality(String),
    cnt          UInt64
)
ENGINE = SummingMergeTree
PARTITION BY event_date
ORDER BY (event_date, module, page, event_name, event_type);

CREATE MATERIALIZED VIEW IF NOT EXISTS mv_app_event_count_local
TO app_event_count_local AS
SELECT
    event_date,
    module,
    page,
    event_name,
    event_type,
    count() AS cnt
FROM app_event_log_local
GROUP BY event_date, module, page, event_name, event_type;
  • 5)漏斗统计表 app_funnel_stats_local 与物化视图 mv_app_funnel_stats_local
    • 以漏斗名称 + 步骤标识为维度,统计每一步的 UV,用于分析“首屏 → 视频聊天 → 支付”等关键步骤转化情况。
CREATE TABLE IF NOT EXISTS app_funnel_stats_local (
    dt             Date,
    funnel_name    LowCardinality(String),
    step_order     UInt8,
    step_name      LowCardinality(String),
    uv_state       AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY dt
ORDER BY (dt, funnel_name, step_order);

CREATE MATERIALIZED VIEW IF NOT EXISTS mv_app_funnel_stats_local
TO app_funnel_stats_local AS
SELECT
    event_date AS dt,
    /* 漏斗名称与步骤依赖离线配置表或注释约定,这里仅示意 */
    'video_chat_pay_funnel' AS funnel_name,
    multiIf(
        event_name = 'app_launch', 1,
        event_name = 'video_chat_enter', 2,
        event_name = 'pay_success', 3,
        0
    ) AS step_order,
    multiIf(
        step_order = 1, '启动',
        step_order = 2, '进入视频聊天',
        step_order = 3, '支付成功',
        '其他'
    ) AS step_name,
    uniqState(user_id) AS uv_state
FROM app_event_log_local
WHERE event_name IN ('app_launch', 'video_chat_enter', 'pay_success')
GROUP BY dt, funnel_name, step_order, step_name;
  • 6)留存统计表 app_retention_stats_local 与物化视图 mv_app_retention_stats_local(可选)
    • 用于统计 D+1、D+7 等留存情况,此处给出简化示例,实际工程中可通过离线任务生成。
CREATE TABLE IF NOT EXISTS app_retention_stats_local (
    first_visit_date   Date,
    retention_day      UInt16, -- 1, 7, 30 等
    platform           LowCardinality(String),
    channel            LowCardinality(String),
    uv_state           AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY first_visit_date
ORDER BY (first_visit_date, retention_day, platform, channel);

说明:留存通常依赖“首次访问日期 + 后续活跃行为”的多表关联计算,可由离线任务或流式任务将结果写入 app_retention_stats_local,不必强行通过单一物化视图完成。

4.5 典型分析查询示例

下面给出若干常见分析场景的 ClickHouse 查询示例,便于运营/数据开发直接复用:

  • 查询指定日期范围内的 DAU(按平台、渠道)
SELECT
    dt,
    platform,
    channel,
    sum(uniqMerge(uv_state)) AS dau
FROM app_dau_local
WHERE dt BETWEEN toDate('2025-01-01') AND toDate('2025-01-07')
GROUP BY dt, platform, channel
ORDER BY dt, platform, channel;
  • 查询某一渠道在某日的视频聊天漏斗转化情况
SELECT
    dt,
    funnel_name,
    step_order,
    step_name,
    uniqMerge(uv_state) AS uv
FROM app_funnel_stats_local
WHERE dt = toDate('2025-01-01')
  AND funnel_name = 'video_chat_pay_funnel'
  AND channel = 'appstore'
GROUP BY dt, funnel_name, step_order, step_name
ORDER BY step_order;

说明:如需支持按渠道区分,可在 app_funnel_stats_local 中增加 channel 字段并在 MV 中补充。

  • 查询某模块/页面/事件的 PV 变化趋势(近 7 天)
SELECT
    event_date,
    module,
    page,
    event_name,
    sum(cnt) AS pv
FROM app_event_count_local
WHERE event_date >= today() - 7
  AND module = '首页'
  AND page = '首页'
  AND event_name = 'app_launch'
GROUP BY event_date, module, page, event_name
ORDER BY event_date;
  • 基于明细表按用户维度统计“视频聊天”相关行为次数
SELECT
    event_date,
    user_id,
    countIf(event_name = 'video_chat_enter') AS enter_cnt,
    countIf(event_name = 'video_chat_message') AS msg_cnt
FROM app_event_log_local
WHERE event_date = today() - 1
  AND module = '视频'
GROUP BY event_date, user_id
ORDER BY msg_cnt DESC
LIMIT 100;
  • 按版本对比某日关键转化事件(如支付成功)分布
SELECT
    event_date,
    app_version,
    count() AS pay_success_cnt
FROM app_event_log_local
WHERE event_date = toDate('2025-01-01')
  AND event_name = 'pay_success'
GROUP BY event_date, app_version
ORDER BY pay_success_cnt DESC;

5. 数据采集与传输流程

5.1 典型链路
  1. 业务代码触发:用户在 APP 内触发某事件(如首页“立即开始聊天按钮”点击)。
  2. SDK 采集
    • AOP/显式调用生成 AppEvent 对象,填入对应字段(module=首页, page=首页, eventName=app_start_chat 等)。
    • 将事件放入 SDK 内部内存队列。
  3. SDK 批量上报:后台线程按「数量阈值 + 时间窗口」策略批量调用 /events/batch 接口。
  4. 采集服务接收
    • 校验参数、补齐公共字段。
    • 将事件批量发送至 MQ(如 RabbitMQ 交换机 exchange.app.event)。
  5. 消费者服务写库
    • 从 MQ 批量拉取,按批量 INSERT 写入 ClickHouse app_event_log 表。
    • 对写入失败的批次记录日志并重试,必要时写入失败队列或本地文件。
  6. 分析与查询
    • 物化视图自动增量刷新统计表。
    • 报表/BI/运营系统通过分析 API 查询 ClickHouse。
5.2 与现有 RabbitMQ 的集成
  • 复用项目中已有的 RabbitMQConfig 配置,新建埋点交换机、队列:
    • 交换机:exchange.app.event
    • 队列:queue.app.event.log
    • 路由键:按环境或业务模块区分,如 event.prod.*
  • 采集服务作为生产者,埋点消费者服务作为消费者,两者均基于 Spring Boot AMQP 实现。

6. 数据分析与统计功能设计

6.1 用户行为分析
  • 路径分析:从启动 → 首页 → 搜索 → 视频 → 支付的完整行为链路。
  • 漏斗分析:如“视频开始 → 对话额度弹窗展示 → 开通会员”的转化率。
  • 留存分析:按首次访问日期统计次日/7 日/30 日留存。
  • 分群分析:按渠道、平台、版本、用户属性维度切分行为数据。
6.2 业务指标统计

结合 Excel 中的关键事件设计指标:

  • 启动相关指标app_launch 次数、冷启动时长分布。
  • 视频相关指标:视频播放完成人数、对话数量、人均对话轮数、视频结算完成人数等。
  • AI 朋友相关指标:AI 朋友关注率、聊天频次、照片/视频查看次数、分享行为。
  • 礼物系统指标:送礼人数、送礼次数、人均送礼金额、命定礼物分享转化率。
  • 会员/支付指标:会员开通率、会员续费率、支付成功率、支付失败原因分布。
6.3 查询与报表实现
  • 通过 ClickHouse SQL 或 BI 工具(如 Superset/Grafana)配置固定报表。
  • 对接内部运营平台:
    • 提供 REST API(如 /api/metrics/dau/api/metrics/funnel)。
    • 查询参数包括日期范围、平台、渠道、版本、用户属性等。

7. 性能优化与可靠性策略

7.1 对业务代码的影响最小化
  • 非阻塞异步上报

    • SDK 将埋点写入内存队列后立即返回,不阻塞业务线程。
    • QPS 较高时,严格控制单次入队耗时(如不做复杂序列化和 IO)。
  • 批量发送

    • 按条数阈值(如 100 条)和时间阈值(如 200ms)组合触发。
    • 支持按模块/事件类型分批,避免单批过大影响网络抖动。
  • 轻量对象与序列化

    • 事件对象采用轻量字段类型;
    • 使用高效 JSON 序列化库(如 Jackson 配置复用、禁用反射开销大的特性)。
7.2 传输与写入性能
  • HTTP 层优化

    • 采集服务支持 Keep-Alive、连接池、压缩(如 gzip)。
    • 请求/响应体格式稳定,避免频繁变更导致客户端升级成本上升。
  • MQ 层优化

    • 批量确认、合理配置 prefetch count。
    • 使用独立埋点队列,保障与业务消息隔离。
  • ClickHouse 写入优化

    • 优先使用批量 INSERT(如每批 1000~5000 条)。
    • 根据查询模式合理设计 ORDER BY 和分区键。
    • 使用 LowCardinality 减少存储和内存占用。
7.3 降级与容错
  • SDK 降级

    • 内存队列满时,可按策略丢弃新增事件或仅保留关键事件。
    • 网络异常时,短时间内重试;超过阈值后进入静默期,防止雪崩。
  • 采集服务降级

    • 扩容为多实例部署,前接网关或负载均衡。
    • 临时关闭非关键校验以提升吞吐。
  • 消费者与 ClickHouse 降级

    • 写入失败时,按批记录到失败队列或本地备份表,用于补数。
    • 出现严重问题时,临时停止消费,优先保障业务写 MQ 能力。

8. 监控与告警设计

8.1 监控指标
  • SDK 侧

    • 队列长度、丢弃事件数量。
    • 上报 QPS、成功率、平均耗时。
  • 采集服务

    • 请求 QPS、耗时分布、非 2xx 比例。
    • 入 MQ 成功率、异常日志量。
  • MQ 层

    • 队列消息堆积量、延迟时间。
    • 消费速率与生产速率差值。
  • 消费者 & ClickHouse

    • 批量写入耗时、失败次数、重试次数。
    • ClickHouse 节点 CPU、内存、磁盘、查询耗时、拒绝率。
  • 分析 API

    • 接口 QPS、响应时间、错误率。
8.2 告警策略
  • 为上述指标设置阈值与告警规则,例如:
    • SDK 丢弃率 > 0.1% 持续 5 分钟。
    • 采集服务错误率 > 1%。
    • MQ 队列堆积超过 N 万条或延迟超过 5 分钟。
    • ClickHouse 写入失败批次数连续 N 次。
  • 告警渠道:邮件 + IM 群(如企业微信 / 飞书)+ 值班短信。
  • 告警分级:P0(影响核心数据可靠性)、P1(影响部分统计)、P2(预警)。

9. 与现有业务系统的集成方案

9.1 系统现状简述
  • 项目基于 Spring Boot 构建,已集成 Redis、RabbitMQ、MyBatis 等组件。
  • 业务模块涵盖:视频、视频、礼物系统、会员系统、私信系统等。
9.2 集成方式
  • 新增埋点模块

    • 在后端工程中新建 tracking 模块(包名如 com.video.tracking),包含:
      • 注解定义与 AOP 切面。
      • TrackingClient 接口及默认实现。
      • 采集服务客户端(调用 /api/track/v1/events/batch)。
  • 与现有过滤器/拦截器集成

    • 在已有登录 / 鉴权过滤器中,统一抽取 userIddeviceIdplatform 等信息存入 UserContext
    • 埋点 AOP 和业务代码从 UserContext 读取公共字段,保证一致性。
  • RabbitMQ 配置复用

    • 通过现有 RabbitMQConfig 新增埋点队列和交换机配置,避免重复连接管理。
  • ClickHouse 集成

    • 新增 ClickHouse 数据源配置,用于埋点消费者服务和分析服务访问。
    • 可独立为一个 Spring Boot 服务,与主业务服务解耦,避免影响核心交易链路。
  • 前后端协议约定

    • 对于前端直接埋点(如 H5),统一使用同一 HTTP 协议字段格式,必要时通过网关转发至埋点采集服务。
9.3 接入步骤示例
  1. 搭建 ClickHouse 集群并创建埋点表结构
  2. 新增埋点采集服务(可作为独立 Spring Boot 应用部署)。
  3. 在主业务服务中引入埋点 SDK 模块,并完成配置(采集服务地址、批量阈值等)。
  4. 对关键业务流程加注 @BizEvent 注解
    • 启动 / 首页曝光;
    • 视频开始/结束、对话发送、AI 回复;
    • 礼物送出、钻石充值;
    • 会员开通/续费、支付成功/失败等。
  5. 搭建埋点消费者服务,配置 MQ 消费组,写入 ClickHouse。
  6. 搭建监控与告警,接入现有监控系统(如 Prometheus + Grafana)。
  7. 和产品/运营协同验证数据,确保事件数、统计口径与预期一致。

10. 扩展与演进规划

  • 多租户/多应用支持

    • 在事件模型中增加 app_id / tenant_id 字段,支持多个 APP 共用一套埋点平台。
  • 隐私与合规

    • 对用户敏感数据进行脱敏或不入库,确保符合隐私政策与用户协议。
    • 结合现有“通用数据脱敏设计方案”与“隐私政策与用户协议管理系统设计”。
  • 智能分析

    • 后续可基于埋点数据进行用户分群、推荐及 A/B 实验等高级分析。
  • 数据回流业务

    • 将分析结果回流业务系统,用于精细化运营和个性化推荐,如:对高价值用户推送特定视频或会员优惠。

本设计文档为后端埋点系统的总体方案,后续可根据实际实现情况补充更详细的接口定义、表结构变更记录及运维手册。

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐