在进行高性能网络编程(如 Epoll、io_uring 服务器开发)时,我们经常需要验证服务器的吞吐量。虽然市面上有 wrkabJMeter 等成熟工具,但有时候我们需要自定义协议格式,或者想要更深入理解压测客户端的底层原理。

今天,我们就用 C 语言手撸一个轻量级的 TCP QPS 压测客户端。它支持多线程并发、自定义请求包大小,并能实时计算 QPS(每秒查询率)。

一、 设计思路

我们的压测工具的核心逻辑非常简单,类似于“人海战术”:

  1. 参数解析:使用 getopt 解析命令行参数(服务器 IP、端口、线程数、总请求数等)。

  2. 多线程并发:根据 -t 参数启动 N 个线程。

  3. 建立连接:每个线程独立连接服务器(模拟多个客户端)。

  4. 循环发送:每个线程在一个 while 循环中,不断地发送数据包并等待接收(Send-Recv 模型)。

  5. 统计结果:主线程等待所有子线程结束,统计总耗时,计算 QPS。

// 假设命令是: ./app -p 9999 -t 10

while ((opt = getopt(..., "p:t:")) != -1) { // 循环直到返回 -1 (没参数了)
    
    // 第一轮循环:
    // opt 变成了 'p'
    // optarg 指向了 "9999"
    
    // 第二轮循环:
    // opt 变成了 't'
    // optarg 指向了 "10"
    
    switch (opt) {
        case 'p': port = atoi(optarg); break; // 拿走 9999
        case 't': thread = atoi(optarg); break; // 拿走 10
    }
}

总结:getopt 就是帮你把乱七八糟的命令行输入,整理成一个个清晰的 Key-Value 对,让你直接在 switch 语句里处理。

二、 核心代码深度解析

1. 命令行参数解析与配置

我们需要一个上下文结构体来在线程间传递配置信息。使用 getopt 是 Linux 下解析参数的标准姿势。

typedef struct test_context_s {
    char serverip[16];
    int port;
    int threadnum;      // 线程数
    int connection;     // 连接数(本例中阻塞模式下 1线程=1连接)
    int requestion;     // 总请求数 (所有线程加起来的任务总量)
    int failed;         // 失败计数
} test_context_t;

2. 核心压测逻辑 (Worker Thread)

这是每个线程执行的函数 test_qps_entry。它的工作流程是: 连接 -> 计算任务量 -> 循环(发包-收包) -> 关闭

static void *test_qps_entry(void *arg) {
    // 把线程传进来的那个“什么都能装”的通用指针(void* arg),强制还原成我们可以识别的“配置单指针”(test_context_t*)
    test_context_t *pctx = (test_context_t*)arg;

    // 1. 建立 TCP 连接
    int connfd = connect_tcpserver(pctx->serverip, pctx->port);
    if (connfd < 0) {
        printf("connect_tcpserver failed\n");
        return NULL;
    }

    // 2. 任务分发:总请求数 / 线程数 = 该线程的任务量
    int count = pctx->requestion / pctx->threadnum;
    int i = 0;
    int res;

    // 3. 核心循环
    while (i++ < count) {
        res = send_recv_tcppkt(connfd); // 发送并接收
        if (res != 0) {
            printf("send_recv_tcppkt failed\n");
            pctx->failed++; // 记录失败(注意:简易版未加锁,生产环境建议原子操作)
            continue;
        }
    }
    
    // 4. 只有跑完所有请求才断开连接,模拟长连接压测
    close(connfd); 
    return NULL;
}

3. 模拟业务数据包 (Send/Recv)

为了模拟真实的负载,我们不仅仅是发一个字节。在 send_recv_tcppkt 函数中,我们构造了一个较大的数据包(将 TEST_MESSAGE 拼接 24 次)。

  • Send: 发送拼接好的长字符串。

  • Recv: 阻塞等待服务器回显数据。

  • Check: 校验收到的数据是否和发送的一致(验证服务器逻辑正确性)。

// 循环拼接 24 次短消息,构造一个较大的数据包以模拟真实的网络负载
for(i = 0; i < 24; i++){
    strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);
}

// 发送
int res = send(fd, wbuffer, strlen(wbuffer), 0);
// 接收
res = recv(fd, rbuffer, RBUFFER_LENGTH, 0);

4. QPS 计算公式

QPS 的计算非常直观:总请求数 / 总耗时(秒)。 为了精度,我们通常用毫秒计算:

int time_used = TIME_SUB_MS(tv_end, tv_begin);
printf("qps: %d\n", ctx.requestion * 1000 / time_used);

三、 完整源码展示

#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <unistd.h>

#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>

typedef struct test_context_s{
    char serverip[16];
    int port;
    int threadnum;
    int connection;
    int requestion;

#if 1
    int failed;
#endif

} test_context_t;

typedef struct test_context_s test_context_t;

int connect_tcpserver(const char *ip, unsigned short port){

    int connfd = socket(AF_INET,SOCK_STREAM,0);

    struct sockaddr_in tcpserver_addr;
    memset(&tcpserver_addr, 0, sizeof(struct sockaddr_in));

    tcpserver_addr.sin_family = AF_INET;
    tcpserver_addr.sin_addr.s_addr = inet_addr(ip);
    tcpserver_addr.sin_port = htons(port);

    int ret = connect(connfd, (struct sockaddr*)&tcpserver_addr, sizeof(struct sockaddr_in));
    if(ret){
        perror("connect\n");
        return -1;
    }

    return connfd;
}

#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define TEST_MESSAGE    "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH      2048
#define WBUFFER_LENGTH      2048

int send_recv_tcppkt(int fd){


#if 0
    int res = send(fd, TEST_MESSAGE, strlen(TEST_MESSAGE),0);
    if(res < 0){
        exit(1);
    }

    char rbuffer[RBUFFER_LENGTH] = {0};
    res = recv(fd, rbuffer, RBUFFER_LENGTH,0);
    if(res <= 0){
        exit(1);
    }

    if(strcmp(rbuffer, TEST_MESSAGE) != 0){
        printf("failed: '%s' != '%s'\n",rbuffer,TEST_MESSAGE);
        return -1;
    }

#else

    char wbuffer[WBUFFER_LENGTH] = {0};
    int i = 0;

    for(i = 0;i<24;i++){
        strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);
    }

    int res = send(fd, wbuffer, strlen(wbuffer), 0);
    if(res < 0){
        exit(1);
    }

    char rbuffer[RBUFFER_LENGTH] = {0};
    res = recv(fd, rbuffer, RBUFFER_LENGTH,0);
    if(res <= 0){
        exit(1);
    }

    if(strcmp(rbuffer, wbuffer) != 0){
        printf("failed: '%s' != '%s'\n",rbuffer,wbuffer);
        return -1;
    }

#endif
    return 0;
}

static void *test_qps_entry(void *arg){

    test_context_t *pctx = (test_context_t*)arg;

   // int conn_num = pctx->connection / pctx->threadnum;

    int connfd = connect_tcpserver(pctx->serverip, pctx->port);
    if(connfd < 0){
        printf("connect_tcpserver failed\n");
        return NULL;
    }

    int count = pctx->requestion / pctx->threadnum;
    int i =0;

    int res;

    while(i++ < count) {
        res = send_recv_tcppkt(connfd);
        if(res != 0){
            printf("send_recv_tcppkt failed\n");
            pctx->failed ++;
            continue; 
        }
    }
    return NULL;
}

int main(int argc, char *argv[]){

    int ret = 0;
    test_context_t ctx = {0};

    int opt;

    while((opt = getopt(argc, argv, "s:p:t:c:n:?")) != -1){

        switch(opt){

            case 's':
                printf("-s: %s\n", optarg);
                strcpy(ctx.serverip, optarg);
                break;
            case 'p':
                printf("-p: %s\n", optarg);
                ctx.port = atoi(optarg);
                break;
            case 't':
                printf("-t: %s\n", optarg);
                ctx.threadnum = atoi(optarg);
                break;
            case 'c':
                printf("-c: %s\n", optarg);
                ctx.connection = atoi(optarg);
                break;
            case 'n':
                printf("-n: %s\n", optarg);
                ctx.requestion = atoi(optarg);
                break;
            default:
                return -1;
        }
    }

    pthread_t *ptid = malloc(ctx.threadnum * sizeof(pthread_t));
    int i=0;

    struct timeval tv_begin;
    gettimeofday(&tv_begin, NULL);
    for(i = 0;i< ctx.threadnum; i++){
        pthread_create(&ptid[i], NULL,test_qps_entry, &ctx);
    }
    for(i = 0;i< ctx.threadnum; i++){
        pthread_join(ptid[i], NULL);
    }
    
    struct timeval tv_end;
    gettimeofday(&tv_end, NULL);

    int time_used = TIME_SUB_MS(tv_end, tv_begin);

    TIME_SUB_MS(tv_end, tv_begin);

    printf("success: %d, failed: %d, time_used: %d, qps: %d\n", ctx.requestion-ctx.failed, ctx.failed,
        time_used, ctx.requestion * 1000 / time_used);

clean:
    free(ptid);
    return ret;

}

四、 总结与局限性

这个工具是一个典型的 BIO (Blocking I/O) 客户端

  • 模型:One Thread Per Connection(一个线程对应一个连接)。

  • 局限性:如果你指定 -c 1000(1000个连接),在当前代码逻辑下,你必须开启 -t 1000(1000个线程)。这在连接数极高时(例如几万并发)会因为线程切换开销过大而导致客户端先成为瓶颈。

  • 适用场景:测试中小规模并发(100~1000连接)下的服务器业务处理能力和吞吐量。

下一步进阶: 如果要测试 10万+ 并发连接 的场景,需要将此客户端改造为 Epoll 非阻塞模式,仅用单线程或少量线程即可管理海量连接。

希望这篇文章能帮你掌握网络压测工具的基本原理!如果你在学习 Epoll 或 io_uring,这个工具将是你调试服务器性能的好帮手。

0voice · GitHub

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐