一、EMQX 简介

EMQX 是一款开源的高性能 分布式 MQTT 消息服务器,专为物联网(IoT)场景设计,支持海量设备连接与消息路由。其核心功能包括:

  1. 协议支持

    • MQTT v3.1/v3.1.1/v5.0:标准物联网通信协议。
    • 其他协议:CoAP、LwM2M、WebSocket、STOMP 等。
    • 多语言 SDK:支持 Java、Python、Go 等客户端开发。
  2. 核心特性

    • 高并发:单节点支持百万级 MQTT 连接。
    • 低延迟:微秒级消息路由(基于 Erlang/OTP 的高效调度)。
    • 集群扩展:水平扩展节点,自动负载均衡。
    • 规则引擎:通过 SQL 实时处理消息(如转发到 Kafka、数据库)。
    • 企业级安全:TLS 加密、ACL 权限控制、LDAP/MySQL 认证集成。
  3. 适用场景

    • 物联网设备管理(如智能家居、车联网)。
    • 实时数据采集与分发(如传感器数据、日志流)。
    • 消息中间件(连接应用系统与边缘设备)。
  4. 版本区别

    • 开源版(EMQX):基础功能免费,适合中小规模。
    • 企业版(EMQX Enterprise):支持更多协议、SLA 保障与高级功能(如数据桥接、离线消息)。

图片转载


二、在 Kubernetes 中部署 EMQX 集群

1. 部署架构
  • StatefulSet:确保每个 EMQX 节点有稳定的网络标识(Pod 名称)和持久化存储。
  • Headless Service:用于节点间发现与通信。
  • ConfigMap:配置 EMQX 集群参数与规则引擎。
2. 部署步骤

前提条件

  • Kubernetes 集群(1.20+)。
  • 已安装 kubectl 和 Helm(可选)。

步骤 1:创建 Namespace

kubectl create ns emqx

步骤 2:编写部署文件(YAML)

# emqx-cluster.yaml
---
# 配置 EMQX 参数
apiVersion: v1
kind: ConfigMap
metadata:
  name: emqx-config
  namespace: emqx
data:
  emqx.conf: |
    cluster {
      discovery = k8s
      name = emqx-cluster
      k8s {
        apiserver = "https://kubernetes.default.svc:443"
        service_name = emqx-headless
        namespace = emqx
        address_type = hostname
        app_name = emqx
      }
    }
    # 其他配置(如监听端口、认证)
    listeners.tcp.default {
      bind = "0.0.0.0:1883"
      max_connections = 1000000
    }
---
# 定义 Headless Service(节点发现)
apiVersion: v1
kind: Service
metadata:
  name: emqx-headless
  namespace: emqx
spec:
  clusterIP: None
  ports:
    - name: mqtt
      port: 1883
      targetPort: 1883
    - name: dashboard
      port: 18083
      targetPort: 18083
  selector:
    app: emqx
---
# 部署 EMQX StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: emqx
  namespace: emqx
spec:
  serviceName: emqx-headless
  replicas: 3
  selector:
    matchLabels:
      app: emqx
  template:
    metadata:
      labels:
        app: emqx
    spec:
      containers:
      - name: emqx
        image: emqx/emqx:5.4.0
        env:
        - name: EMQX_NODE_NAME
          value: "emqx@$(POD_NAME).emqx-headless.emqx.svc.cluster.local"
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        ports:
        - containerPort: 1883  # MQTT
        - containerPort: 8883  # MQTT/SSL
        - containerPort: 8083  # WebSocket
        - containerPort: 18083  # Dashboard
        volumeMounts:
        - name: emqx-data
          mountPath: /opt/emqx/data
        - name: config
          mountPath: /opt/emqx/etc/emqx.conf
          subPath: emqx.conf
      volumes:
      - name: config
        configMap:
          name: emqx-config
  volumeClaimTemplates:
  - metadata:
      name: emqx-data
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: "standard"  # 根据集群存储类修改
      resources:
        requests:
          storage: 10Gi
---
# 对外暴露 Dashboard 和 MQTT 服务
apiVersion: v1
kind: Service
metadata:
  name: emqx-loadbalancer
  namespace: emqx
spec:
  type: LoadBalancer  # 或 NodePort/Ingress
  ports:
    - name: mqtt
      port: 1883
      targetPort: 1883
    - name: dashboard
      port: 18083
      targetPort: 18083
  selector:
    app: emqx

步骤 3:应用配置

kubectl apply -f emqx-cluster.yaml

步骤 4:验证部署

# 检查 Pod 状态
kubectl get pods -n emqx -l app=emqx

# 查看集群节点状态(需进入任意 Pod)
kubectl exec -it emqx-0 -n emqx -- emqx_ctl cluster status

三、EMQX 集群性能测试

1. 测试工具选择
  • emqtt-bench:EMQX 官方压测工具(基于 Erlang,适合 MQTT 协议)。
  • JMeter + MQTT 插件:图形化界面,支持复杂场景。
  • Xmeter:云原生压测平台(企业级)。
2. 使用 emqtt-bench 进行测试

步骤 1:安装 emqtt-bench

git clone https://github.com/emqx/emqtt-bench.git
cd emqtt-bench
make

步骤 2:模拟 10 万设备连接

# 启动 1000 个客户端,每个发布 1 条消息/秒,持续 60 秒
./emqtt_bench conn -c 100000 -h <EMQX-Service-IP> -p 1883 -t 60 --interval 10

步骤 3:测试消息吞吐量

# 启动 100 个发布者和 100 个订阅者
./emqtt_bench sub -c 100 -h <EMQX-Service-IP> -p 1883 -t bench/#
./emqtt_bench pub -c 100 -h <EMQX-Service-IP> -p 1883 -t bench/msg -m "hello" -s 256 -q 1

步骤 4:监控指标

  • EMQX Dashboardhttp://<LB-IP>:18083):查看连接数、消息速率、节点负载。
  • Prometheus + Grafana:集成 EMQX 的监控指标(如 emqx_cluster_nodes{node="emqx@node1", state="running"})。
3. 关键性能指标
  • 连接建立速率(connections/sec):每秒成功建立的 MQTT 连接数。
  • 消息吞吐量(msg/sec):每秒发布和消费的消息数量。
  • 端到端延迟(P99 Latency):99% 的消息从发布到订阅的延迟时间。
  • 资源利用率:CPU、内存、网络带宽占用。
4. 优化建议
  • 调整 EMQX 参数
    # emqx.conf
    listeners.tcp.default {
      max_connections = 1000000
      acceptor_pool_size = 16  # 根据 CPU 核数调整
    }
    
  • Kubernetes 资源配置
    • 为 EMQX Pod 分配足够 CPU/内存(如 4 CPU + 8GiB)。
    • 使用本地 SSD 存储(减少 I/O 延迟)。
  • 网络优化
    • 启用 MQTT over TLS(避免明文传输开销)。
    • 使用高性能 CNI 插件(如 Cilium)。

四、实战示例:车联网数据采集

场景

  • 10 万辆车辆每 5 秒上报 GPS 数据(消息大小 1KB)。
  • EMQX 集群将数据转发至 Kafka 供实时分析。

部署与测试

  1. 部署 EMQX 集群(如上述 YAML 配置)。
  2. 配置规则引擎(将数据转发到 Kafka):
    SELECT 
        payload.lat as latitude,
        payload.lng as longitude,
        clientid as vehicle_id
    FROM "vehicle/gps"
    
  3. 压测:使用 emqtt-bench 模拟 10 万客户端,验证吞吐量与延迟。
  4. 结果
    • 连接数稳定在 10 万,无断开。
    • 消息吞吐量 ≥ 20k msg/sec,P99 延迟 < 50ms。

五、总结

通过 Kubernetes 部署 EMQX 集群,您可以快速构建高可用、弹性扩展的物联网消息平台。结合性能测试工具,可验证集群在真实负载下的表现,并根据结果优化资源配置与 EMQX 参数。典型场景包括:

  • 智能工厂:设备状态实时监控。
  • 智慧城市:海量传感器数据聚合。
  • 实时通信:聊天应用、游戏服务器。

扩展阅读

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐