在这里插入图片描述

👋 大家好,欢迎来到我的技术博客!
💻 作为一名热爱 Java 与软件开发的程序员,我始终相信:清晰的逻辑 + 持续的积累 = 稳健的成长
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕一个云原生相关话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!


云原生 + 大数据:Spark on K8s 部署实战 🚀

“未来已来,只是分布不均。” —— 威廉·吉布森
在云计算与大数据交汇的今天,传统的 Hadoop 生态正经历一场深刻的变革。Kubernetes 已成为容器编排的事实标准,而 Spark 作为最流行的大数据分析引擎,两者的结合(Spark on Kubernetes)正在重塑大数据应用的部署、调度与运维方式。这不仅是技术的演进,更是架构思维的跃迁。

随着企业 IT 架构全面走向云原生,如何将 Spark 这样的批流一体计算框架无缝集成到 Kubernetes 平台,已成为 DevOps 工程师、大数据开发人员和 SRE 团队的核心议题。传统的 YARN 或 Mesos 调度模式虽然成熟,但在资源弹性、环境一致性、CI/CD 集成等方面逐渐显现出局限性。而 Spark on K8s 提供了一种全新的可能性:统一基础设施、动态扩缩容、声明式配置、服务网格集成,真正实现“计算即服务”。

本文将带你从零开始,深入探索 Spark on Kubernetes 的完整部署流程、核心原理、Java 编码实践与生产级优化策略。我们将通过大量可运行的 Java 代码示例、清晰的 mermaid 架构图以及真实可用的外部链接,构建一个端到端的实战指南。无论你是初学者还是希望提升生产部署能力的资深工程师,都能从中获得实用价值。


🔹 Spark on Kubernetes 核心架构解析

在深入编码与部署之前,我们必须先理解 Spark on K8s 的整体架构模型。与传统模式不同,Spark 不再依赖于 YARN ResourceManager 来分配 Executor,而是由 Kubernetes API Server 直接管理 Pod 生命周期。

✅ 架构组件概览

下图展示了 Spark on K8s 的典型工作流:

Spark Submit CLI / Client
Kubernetes API Server
Driver Pod
Kubelet on Node
Spark Application Master? No!
Submit Executor Pods via API Server
Executor Pod 1
Executor Pod 2
Executor Pod N
Shared Storage: S3/HDFS/NFS

如图所示,关键变化在于:

  • 无 ApplicationMaster:Spark 不再启动 AM,Driver 直接与 K8s API 对话。
  • Driver 和 Executors 均为 Pod:每个组件都是独立的 Kubernetes Pod,拥有自己的资源限制、标签和健康检查。
  • 直接调用 API Serverspark-submit 通过 Service Account 向 API Server 发起请求,创建 Driver 和 Executor Pod。

这种设计带来了显著优势:

特性 传统 YARN 模式 Spark on K8s
资源隔离 JVM 级别,易争抢 Pod 级别,强隔离
弹性伸缩 手动或脚本控制 原生 HPA + Cluster Autoscaler
环境一致性 需维护多套集群 统一镜像,Dev/QA/Prod 一致
CI/CD 集成 复杂 可与 ArgoCD/GitLab CI 深度集成

🔧 核心通信机制

当用户执行 spark-submit 时,其内部逻辑如下:

  1. 客户端解析参数并生成 Driver Pod Spec;
  2. 通过 K8s REST API 创建 Driver Pod;
  3. Driver 启动后,读取配置中的 spark.kubernetes.executor.podTemplateFile(如有),构造 Executor Pod 模板;
  4. Driver 使用 fabric8 kubernetes-client 库动态提交多个 Executor Pod;
  5. 所有 Pod 共享一个 Headless Service 用于内部通信;
  6. 计算完成后,Driver 向 API Server 发出清理指令,删除自身及所有 Executor。

💡 提示:官方推荐使用 Fabric8 Kubernetes Client 实现此类自动化操作。该库是 Java 生态中最活跃的 K8s SDK,支持 CRD、Operator 开发等高级功能。


🔹 准备 Kubernetes 集群环境

要运行 Spark on K8s,首先需要一个可用的 Kubernetes 集群。你可以选择以下任意一种方式:

  • Minikube(本地测试)
  • Kind(Kubernetes in Docker,CI 友好)
  • GKE / EKS / AKS(生产环境)
  • Rancher / OpenShift(企业私有化)

我们以 Minikube 为例进行演示,因其安装简单且适合学习。

🛠️ 安装 Minikube 与 kubectl

# 下载 kubectl
curl -LO "https://dl.k8s.io/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x kubectl && sudo mv kubectl /usr/local/bin/

# 安装 Minikube
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube

# 启动集群(建议至少 4CPUs + 8GB RAM)
minikube start --cpus=4 --memory=8192 --driver=docker

验证集群状态:

kubectl cluster-info
minikube status

你应该看到类似输出:

Kubernetes control plane is running at https://192.168.49.2:8443
CoreDNS is running at https://192.168.49.2:8443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy

🔐 配置 RBAC 权限

由于 Spark Driver 需要创建 Pod,必须为其绑定适当的角色。创建 spark-rbac.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: spark-sa
  namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: default
  name: spark-role
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["get", "create", "delete"]
- apiGroups: [""]
  resources: ["configmaps"]
  verbs: ["get", "create", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: spark-role-binding
  namespace: default
subjects:
- kind: ServiceAccount
  name: spark-sa
  namespace: default
roleRef:
  kind: Role
  name: spark-role
  apiGroup: rbac.authorization.k8s.io

应用配置:

kubectl apply -f spark-rbac.yaml

📘 更详细的 RBAC 最佳实践可参考 Kubernetes 官方文档 - Using RBAC Authorization


🔹 构建 Spark 镜像并推送至仓库

Spark 官方提供了基础镜像,但为了支持自定义依赖(如 JDBC 驱动、Hadoop-AWS),我们需要构建专属镜像。

🐳 编写 Dockerfile

创建目录 spark-k8s-image 并添加以下文件:

Dockerfile

FROM apache/spark:v3.5.0

LABEL maintainer="data-engineer@example.com"

# 安装额外工具(可选)
USER root
RUN apt-get update && apt-get install -y \
    curl \
    netcat \
    && rm -rf /var/lib/apt/lists/*

# 切换回 spark 用户
USER ${spark_uid}

# 添加 AWS SDK 支持(用于访问 S3)
COPY jars/aws-java-sdk-bundle-1.12.633.jar /opt/spark/jars/
COPY jars/hadoop-aws-3.3.6.jar /opt/spark/jars/

# 设置 Hadoop 配置(避免 ClassNotFoundException)
ENV SPARK_DIST_CLASSPATH=$SPARK_HOME/jars/*:$HADOOP_HOME/share/hadoop/tools/lib/*

# 可选:预加载常用 connector
COPY jars/postgresql-42.7.3.jar /opt/spark/jars/
COPY jars/mongo-spark-connector_2.12-10.4.0.jar /opt/spark/jars/

WORKDIR /opt/spark

下载所需 JAR 包:

mkdir jars
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar -O jars/hadoop-aws-3.3.6.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.633/aws-java-sdk-bundle-1.12.633.jar -O jars/aws-java-sdk-bundle-1.12.633.jar
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar -O jars/postgresql-42.7.3.jar
wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.4.0/mongo-spark-connector_2.12-10.4.0.jar -O jars/mongo-spark-connector_2.12-10.4.0.jar

🛠️ 构建并推送镜像

# 构建镜像
docker build -t my-spark:v3.5.0 .

# 标记并推送到 Docker Hub(替换 yourname)
docker tag my-spark:v3.5.0 yourname/my-spark:v3.5.0
docker push yourname/my-spark:v3.5.0

# 如果使用 Minikube,直接加载镜像(无需推送)
minikube image load my-spark:v3.5.0

✅ 推荐阅读:Apache Spark 官方 Docker 镜像说明


🔹 开发 Spark 应用程序(Java 示例)

现在进入核心环节——编写可在 Kubernetes 上运行的 Spark 程序。我们将实现几个典型场景:日志分析、数据库读写、流处理。

📦 Maven 项目结构

创建标准 Maven 工程:

<!-- pom.xml -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.spark</groupId>
    <artifactId>spark-k8s-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <spark.version>3.5.0</spark.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- PostgreSQL Connector -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.7.3</version>
        </dependency>
        <!-- MongoDB Connector -->
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_${scala.binary.version}</artifactId>
            <version>10.4.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

🧪 示例 1:Word Count(经典入门)

这是 Spark 的“Hello World”,用于统计文本中单词出现次数。

// src/main/java/com/example/spark/WordCountJob.java
package com.example.spark;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;

public class WordCountJob {
    public static void main(String[] args) {
        if (args.length < 1) {
            System.err.println("Usage: WordCountJob <input-file>");
            System.exit(1);
        }

        String inputFile = args[0];

        SparkSession spark = SparkSession.builder()
                .appName("WordCountOnK8s")
                .getOrCreate();

        // 读取文本文件
        Dataset<String> lines = spark.read().textFile(inputFile);

        // 分词、计数
        Dataset<Row> wordCounts = lines
                .select(explode(split(col("value"), " ")).as("word"))
                .groupBy("word")
                .count()
                .orderBy(desc("count"));

        // 显示结果
        wordCounts.show(20);

        // 可选:保存到输出路径
        if (args.length > 1) {
            wordCounts.write().mode("overwrite").json(args[1]);
        }

        spark.stop();
    }
}

打包:

mvn clean package
# 输出:target/spark-k8s-demo-1.0-SNAPSHOT.jar

🧪 示例 2:读写 PostgreSQL 数据库

假设你有一个运行在 K8s 中的 PostgreSQL 实例,我们将演示如何使用 Spark 写入和查询数据。

启动 Postgres 实例
# postgres-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres
spec:
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
      - name: postgres
        image: postgres:15
        env:
        - name: POSTGRES_DB
          value: analytics
        - name: POSTGRES_USER
          value: sparkuser
        - name: POSTGRES_PASSWORD
          value: sparkpass
        ports:
        - containerPort: 5432
        volumeMounts:
        - name: data-volume
          mountPath: /var/lib/postgresql/data
      volumes:
      - name: data-volume
        emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
  name: postgres-svc
spec:
  selector:
    app: postgres
  ports:
    - protocol: TCP
      port: 5432
      targetPort: 5432
  type: ClusterIP

部署:

kubectl apply -f postgres-deploy.yaml
Java 代码:插入与查询用户数据
// src/main/java/com/example/spark/PostgresReadWrite.java
package com.example.spark;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

public class PostgresReadWrite {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("SparkK8sPostgresDemo")
                .getOrCreate();

        String jdbcUrl = "jdbc:postgresql://postgres-svc:5432/analytics";
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "sparkuser");
        connectionProperties.put("password", "sparkpass");
        connectionProperties.put("driver", "org.postgresql.Driver");

        // 创建示例数据
        Dataset<Row> userData = spark.sql("""
            SELECT * FROM VALUES 
              ('Alice', 25, 'Engineering'),
              ('Bob', 30, 'Marketing'),
              ('Charlie', 35, 'Sales')
            AS T(name, age, department)
            """);

        // 写入 PostgreSQL
        userData.write()
                .mode(SaveMode.Overwrite)
                .jdbc(jdbcUrl, "employees", connectionProperties);

        System.out.println("✅ Data written to PostgreSQL table 'employees'");

        // 从数据库读取
        Dataset<Row> df = spark.read()
                .jdbc(jdbcUrl, "employees", connectionProperties);

        df.createOrReplaceTempView("employees");

        // 查询年龄大于 28 的员工
        Dataset<Row> result = spark.sql("SELECT name, age FROM employees WHERE age > 28");
        result.show();

        spark.stop();
    }
}

🧪 示例 3:MongoDB 流式数据处理

我们将使用 Spark Structured Streaming 从 Kafka 读取 JSON 消息,并写入 MongoDB。

启动 Kafka 与 MongoDB

使用 Helm 快速部署:

# 添加 Bitnami 仓库
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update

# 安装 Kafka
helm install kafka bitnami/kafka --set replicaCount=1,controller.replicaCount=1

# 安装 MongoDB
helm install mongodb bitnami/mongodb --set auth.rootPassword=secret,password=secret

获取连接信息:

echo "Kafka Broker: kafka.default.svc.cluster.local:9092"
echo "MongoDB URI: mongodb://root:secret@mongodb:27017"
Java 代码:实时订单处理
// src/main/java/com/example/spark/StreamingToMongo.java
package com.example.spark;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.util.HashMap;
import java.util.Map;

public class StreamingToMongo {
    // 定义订单类
    public static class Order {
        public String orderId;
        public String product;
        public int quantity;
        public double price;
        public long timestamp;

        public Order() {}

        public Order(String orderId, String product, int quantity, double price) {
            this.orderId = orderId;
            this.product = product;
            this.quantity = quantity;
            this.price = price;
            this.timestamp = System.currentTimeMillis();
        }
    }

    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = SparkSession.builder()
                .appName("StreamingToMongoDB")
                .getOrCreate();

        spark.sparkContext().setLogLevel("WARN");

        // 从 Kafka 读取流
        Dataset<String> kafkaStream = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "kafka.default.svc.cluster.local:9092")
                .option("subscribe", "orders")
                .option("startingOffsets", "latest")
                .load()
                .selectExpr("CAST(value AS STRING)")
                .as(Encoders.STRING());

        // 解析 JSON 并转换为 Order 对象
        Dataset<Order> orders = kafkaStream.map(json -> {
            // 简化处理:假设 JSON 格式为 {"orderId":"O1","product":"Laptop","quantity":1,"price":999.99}
            // 实际应使用 Jackson/Gson 解析
            Order order = new Order();
            // 此处省略完整解析逻辑,仅作示意
            return order;
        }, Encoders.bean(Order.class));

        // 写入 MongoDB
        Map<String, String> writeOptions = new HashMap<>();
        writeOptions.put("uri", "mongodb://root:secret@mongodb:27017");
        writeOptions.put("database", "sales");
        writeOptions.put("collection", "orders");

        StreamingQuery query = orders.writeStream()
                .outputMode("append")
                .format("mongodb.spark.sql.DefaultSource")
                .options(writeOptions)
                .option("checkpointLocation", "/tmp/checkpoint") // 注意:需挂载持久卷
                .start();

        System.out.println("🚀 Streaming query started. Waiting for data...");

        query.awaitTermination();
    }
}

📘 学习更多:MongoDB Spark Connector 官方文档


🔹 使用 spark-submit 部署到 Kubernetes

完成代码开发后,即可使用 spark-submit 将任务提交至 K8s 集群。

🚀 提交 WordCount 任务

$SPARK_HOME/bin/spark-submit \
  --master k8s://https://$(minikube ip):8443 \
  --deploy-mode cluster \
  --name spark-wordcount \
  --class com.example.spark.WordCountJob \
  --conf spark.kubernetes.container.image=yourname/my-spark:v3.5.0 \
  --conf spark.kubernetes.authenticate.serviceAccountName=spark-sa \
  --conf spark.executor.instances=2 \
  --conf spark.executor.memory=1g \
  --conf spark.driver.memory=1g \
  local:///opt/spark/examples/jars/spark-k8s-demo-1.0-SNAPSHOT.jar \
  s3a://my-data-bucket/logs/access.log

⚠️ 注意:

  • local:// 表示 JAR 包已内置在镜像中;
  • 若 JAR 在本地,需使用 --files 或提前上传;
  • S3 访问需配置 AWS 凭证(可通过 ConfigMap 或 Secret 注入)。

🔐 注入 AWS 凭证(Secret 方式)

# aws-secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: aws-creds
type: Opaque
data:
  AWS_ACCESS_KEY_ID: QUtJQV...  # base64 编码
  AWS_SECRET_ACCESS_KEY: bnphcGQ= 

spark-submit 中引用:

--conf spark.kubernetes.authenticate.driver.secret.name=aws-creds \
--conf spark.hadoop.fs.s3a.access.key=$(AWS_ACCESS_KEY_ID) \
--conf spark.hadoop.fs.s3a.secret.key=$(AWS_SECRET_ACCESS_KEY)

🔹 高级配置与性能调优

📊 资源请求与限制

合理设置 CPU 和内存对稳定性至关重要:

--conf spark.driver.memory=2g \
--conf spark.driver.cores=1 \
--conf spark.executor.memory=4g \
--conf spark.executor.cores=2 \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.driver.limit.cores=2 \
--conf spark.kubernetes.executor.limit.cores=3

🧩 Pod 模板定制

使用 podTemplateFile 精细控制 Pod 配置:

# driver-template.yaml
apiVersion: v1
kind: Pod
spec:
  containers:
    - name: spark-kubernetes-driver
      env:
        - name: SPARK_LOCAL_DIRS
          value: /tmp
        - name: JAVA_OPTS
          value: "-XX:+UseG1GC"
      volumeMounts:
        - name: temp-volume
          mountPath: /tmp
  volumes:
    - name: temp-volume
      emptyDir: {}

提交时指定:

--conf spark.kubernetes.driver.podTemplateFile=driver-template.yaml \
--conf spark.kubernetes.executor.podTemplateFile=executor-template.yaml

🔄 自动伸缩(Elastic Scaling)

结合 K8s Vertical Pod Autoscaler (VPA) 或自定义 Operator 实现动态调整 Executor 数量。


🔹 监控与日志收集

📈 Prometheus + Grafana 集成

Spark 支持将指标导出为 Prometheus 格式:

--conf spark.metrics.conf.*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet \
--conf spark.metrics.conf.*.sink.prometheusServlet.path=/metrics/prometheus

然后通过 kube-prometheus-stack 采集。

📋 日志查看

# 查看 Driver 日志
kubectl logs -f spark-wordcount-driver

# 查看 Executor 日志
kubectl get pods | grep executor | awk '{print $1}' | xargs -I {} kubectl logs {}

🔹 生产环境最佳实践

  • ✅ 使用 Helm Chart 管理 Spark 应用部署;
  • ✅ 所有镜像使用 固定版本标签,禁止 latest
  • ✅ 敏感信息通过 VaultKubernetes Secrets 管理;
  • ✅ 启用 Pod Security PoliciesOPA Gatekeeper
  • ✅ 结合 Argo CD 实现 GitOps 部署;
  • ✅ 定期审计 RBAC 权限,遵循最小权限原则。

🔹 总结与展望

Spark on Kubernetes 不仅是一种部署方式的改变,更代表着大数据平台向云原生架构的全面转型。通过本文的实战演练,我们完成了从环境搭建、代码开发到集群部署的全流程,掌握了 Java 应用在 K8s 上运行的关键技能。

未来,随着 Spark History Server on K8sAdaptive Query ExecutionProject Zen 等特性的完善,Spark on K8s 将在性能、可观测性和易用性上持续进化。

🌐 拓展阅读:

拥抱云原生,让大数据更轻盈、更敏捷、更具韧性。🚀


🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐