云原生 + 大数据:Spark on K8s 部署实战
Spark on K8s 部署实战:云原生大数据架构解析 本文深入探讨 Spark 在 Kubernetes 上的部署方案,通过架构解析、环境搭建和 RBAC 配置,实现从传统 YARN 到云原生的技术升级。核心内容包括: 架构优势:Spark on K8s 采用无 ApplicationMaster 设计,Driver 直接与 API Server 交互,实现 Pod 级资源隔离和弹性伸缩 环境
👋 大家好,欢迎来到我的技术博客!
💻 作为一名热爱 Java 与软件开发的程序员,我始终相信:清晰的逻辑 + 持续的积累 = 稳健的成长。
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕一个云原生相关话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!
文章目录
云原生 + 大数据:Spark on K8s 部署实战 🚀
“未来已来,只是分布不均。” —— 威廉·吉布森
在云计算与大数据交汇的今天,传统的 Hadoop 生态正经历一场深刻的变革。Kubernetes 已成为容器编排的事实标准,而 Spark 作为最流行的大数据分析引擎,两者的结合(Spark on Kubernetes)正在重塑大数据应用的部署、调度与运维方式。这不仅是技术的演进,更是架构思维的跃迁。
随着企业 IT 架构全面走向云原生,如何将 Spark 这样的批流一体计算框架无缝集成到 Kubernetes 平台,已成为 DevOps 工程师、大数据开发人员和 SRE 团队的核心议题。传统的 YARN 或 Mesos 调度模式虽然成熟,但在资源弹性、环境一致性、CI/CD 集成等方面逐渐显现出局限性。而 Spark on K8s 提供了一种全新的可能性:统一基础设施、动态扩缩容、声明式配置、服务网格集成,真正实现“计算即服务”。
本文将带你从零开始,深入探索 Spark on Kubernetes 的完整部署流程、核心原理、Java 编码实践与生产级优化策略。我们将通过大量可运行的 Java 代码示例、清晰的 mermaid
架构图以及真实可用的外部链接,构建一个端到端的实战指南。无论你是初学者还是希望提升生产部署能力的资深工程师,都能从中获得实用价值。
🔹 Spark on Kubernetes 核心架构解析
在深入编码与部署之前,我们必须先理解 Spark on K8s 的整体架构模型。与传统模式不同,Spark 不再依赖于 YARN ResourceManager 来分配 Executor,而是由 Kubernetes API Server 直接管理 Pod 生命周期。
✅ 架构组件概览
下图展示了 Spark on K8s 的典型工作流:
如图所示,关键变化在于:
- 无 ApplicationMaster:Spark 不再启动 AM,Driver 直接与 K8s API 对话。
- Driver 和 Executors 均为 Pod:每个组件都是独立的 Kubernetes Pod,拥有自己的资源限制、标签和健康检查。
- 直接调用 API Server:
spark-submit
通过 Service Account 向 API Server 发起请求,创建 Driver 和 Executor Pod。
这种设计带来了显著优势:
特性 | 传统 YARN 模式 | Spark on K8s |
---|---|---|
资源隔离 | JVM 级别,易争抢 | Pod 级别,强隔离 |
弹性伸缩 | 手动或脚本控制 | 原生 HPA + Cluster Autoscaler |
环境一致性 | 需维护多套集群 | 统一镜像,Dev/QA/Prod 一致 |
CI/CD 集成 | 复杂 | 可与 ArgoCD/GitLab CI 深度集成 |
🔧 核心通信机制
当用户执行 spark-submit
时,其内部逻辑如下:
- 客户端解析参数并生成 Driver Pod Spec;
- 通过 K8s REST API 创建 Driver Pod;
- Driver 启动后,读取配置中的
spark.kubernetes.executor.podTemplateFile
(如有),构造 Executor Pod 模板; - Driver 使用
fabric8 kubernetes-client
库动态提交多个 Executor Pod; - 所有 Pod 共享一个 Headless Service 用于内部通信;
- 计算完成后,Driver 向 API Server 发出清理指令,删除自身及所有 Executor。
💡 提示:官方推荐使用 Fabric8 Kubernetes Client 实现此类自动化操作。该库是 Java 生态中最活跃的 K8s SDK,支持 CRD、Operator 开发等高级功能。
🔹 准备 Kubernetes 集群环境
要运行 Spark on K8s,首先需要一个可用的 Kubernetes 集群。你可以选择以下任意一种方式:
- Minikube(本地测试)
- Kind(Kubernetes in Docker,CI 友好)
- GKE / EKS / AKS(生产环境)
- Rancher / OpenShift(企业私有化)
我们以 Minikube 为例进行演示,因其安装简单且适合学习。
🛠️ 安装 Minikube 与 kubectl
# 下载 kubectl
curl -LO "https://dl.k8s.io/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x kubectl && sudo mv kubectl /usr/local/bin/
# 安装 Minikube
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube
# 启动集群(建议至少 4CPUs + 8GB RAM)
minikube start --cpus=4 --memory=8192 --driver=docker
验证集群状态:
kubectl cluster-info
minikube status
你应该看到类似输出:
Kubernetes control plane is running at https://192.168.49.2:8443
CoreDNS is running at https://192.168.49.2:8443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
🔐 配置 RBAC 权限
由于 Spark Driver 需要创建 Pod,必须为其绑定适当的角色。创建 spark-rbac.yaml
:
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark-sa
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: default
name: spark-role
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["get", "create", "delete"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-role-binding
namespace: default
subjects:
- kind: ServiceAccount
name: spark-sa
namespace: default
roleRef:
kind: Role
name: spark-role
apiGroup: rbac.authorization.k8s.io
应用配置:
kubectl apply -f spark-rbac.yaml
📘 更详细的 RBAC 最佳实践可参考 Kubernetes 官方文档 - Using RBAC Authorization。
🔹 构建 Spark 镜像并推送至仓库
Spark 官方提供了基础镜像,但为了支持自定义依赖(如 JDBC 驱动、Hadoop-AWS),我们需要构建专属镜像。
🐳 编写 Dockerfile
创建目录 spark-k8s-image
并添加以下文件:
Dockerfile
FROM apache/spark:v3.5.0
LABEL maintainer="data-engineer@example.com"
# 安装额外工具(可选)
USER root
RUN apt-get update && apt-get install -y \
curl \
netcat \
&& rm -rf /var/lib/apt/lists/*
# 切换回 spark 用户
USER ${spark_uid}
# 添加 AWS SDK 支持(用于访问 S3)
COPY jars/aws-java-sdk-bundle-1.12.633.jar /opt/spark/jars/
COPY jars/hadoop-aws-3.3.6.jar /opt/spark/jars/
# 设置 Hadoop 配置(避免 ClassNotFoundException)
ENV SPARK_DIST_CLASSPATH=$SPARK_HOME/jars/*:$HADOOP_HOME/share/hadoop/tools/lib/*
# 可选:预加载常用 connector
COPY jars/postgresql-42.7.3.jar /opt/spark/jars/
COPY jars/mongo-spark-connector_2.12-10.4.0.jar /opt/spark/jars/
WORKDIR /opt/spark
下载所需 JAR 包:
mkdir jars
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar -O jars/hadoop-aws-3.3.6.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.633/aws-java-sdk-bundle-1.12.633.jar -O jars/aws-java-sdk-bundle-1.12.633.jar
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar -O jars/postgresql-42.7.3.jar
wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.4.0/mongo-spark-connector_2.12-10.4.0.jar -O jars/mongo-spark-connector_2.12-10.4.0.jar
🛠️ 构建并推送镜像
# 构建镜像
docker build -t my-spark:v3.5.0 .
# 标记并推送到 Docker Hub(替换 yourname)
docker tag my-spark:v3.5.0 yourname/my-spark:v3.5.0
docker push yourname/my-spark:v3.5.0
# 如果使用 Minikube,直接加载镜像(无需推送)
minikube image load my-spark:v3.5.0
✅ 推荐阅读:Apache Spark 官方 Docker 镜像说明
🔹 开发 Spark 应用程序(Java 示例)
现在进入核心环节——编写可在 Kubernetes 上运行的 Spark 程序。我们将实现几个典型场景:日志分析、数据库读写、流处理。
📦 Maven 项目结构
创建标准 Maven 工程:
<!-- pom.xml -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.spark</groupId>
<artifactId>spark-k8s-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<spark.version>3.5.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- PostgreSQL Connector -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.3</version>
</dependency>
<!-- MongoDB Connector -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_${scala.binary.version}</artifactId>
<version>10.4.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
🧪 示例 1:Word Count(经典入门)
这是 Spark 的“Hello World”,用于统计文本中单词出现次数。
// src/main/java/com/example/spark/WordCountJob.java
package com.example.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;
public class WordCountJob {
public static void main(String[] args) {
if (args.length < 1) {
System.err.println("Usage: WordCountJob <input-file>");
System.exit(1);
}
String inputFile = args[0];
SparkSession spark = SparkSession.builder()
.appName("WordCountOnK8s")
.getOrCreate();
// 读取文本文件
Dataset<String> lines = spark.read().textFile(inputFile);
// 分词、计数
Dataset<Row> wordCounts = lines
.select(explode(split(col("value"), " ")).as("word"))
.groupBy("word")
.count()
.orderBy(desc("count"));
// 显示结果
wordCounts.show(20);
// 可选:保存到输出路径
if (args.length > 1) {
wordCounts.write().mode("overwrite").json(args[1]);
}
spark.stop();
}
}
打包:
mvn clean package
# 输出:target/spark-k8s-demo-1.0-SNAPSHOT.jar
🧪 示例 2:读写 PostgreSQL 数据库
假设你有一个运行在 K8s 中的 PostgreSQL 实例,我们将演示如何使用 Spark 写入和查询数据。
启动 Postgres 实例
# postgres-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres
spec:
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:15
env:
- name: POSTGRES_DB
value: analytics
- name: POSTGRES_USER
value: sparkuser
- name: POSTGRES_PASSWORD
value: sparkpass
ports:
- containerPort: 5432
volumeMounts:
- name: data-volume
mountPath: /var/lib/postgresql/data
volumes:
- name: data-volume
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: postgres-svc
spec:
selector:
app: postgres
ports:
- protocol: TCP
port: 5432
targetPort: 5432
type: ClusterIP
部署:
kubectl apply -f postgres-deploy.yaml
Java 代码:插入与查询用户数据
// src/main/java/com/example/spark/PostgresReadWrite.java
package com.example.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.Properties;
public class PostgresReadWrite {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("SparkK8sPostgresDemo")
.getOrCreate();
String jdbcUrl = "jdbc:postgresql://postgres-svc:5432/analytics";
Properties connectionProperties = new Properties();
connectionProperties.put("user", "sparkuser");
connectionProperties.put("password", "sparkpass");
connectionProperties.put("driver", "org.postgresql.Driver");
// 创建示例数据
Dataset<Row> userData = spark.sql("""
SELECT * FROM VALUES
('Alice', 25, 'Engineering'),
('Bob', 30, 'Marketing'),
('Charlie', 35, 'Sales')
AS T(name, age, department)
""");
// 写入 PostgreSQL
userData.write()
.mode(SaveMode.Overwrite)
.jdbc(jdbcUrl, "employees", connectionProperties);
System.out.println("✅ Data written to PostgreSQL table 'employees'");
// 从数据库读取
Dataset<Row> df = spark.read()
.jdbc(jdbcUrl, "employees", connectionProperties);
df.createOrReplaceTempView("employees");
// 查询年龄大于 28 的员工
Dataset<Row> result = spark.sql("SELECT name, age FROM employees WHERE age > 28");
result.show();
spark.stop();
}
}
🧪 示例 3:MongoDB 流式数据处理
我们将使用 Spark Structured Streaming 从 Kafka 读取 JSON 消息,并写入 MongoDB。
启动 Kafka 与 MongoDB
使用 Helm 快速部署:
# 添加 Bitnami 仓库
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
# 安装 Kafka
helm install kafka bitnami/kafka --set replicaCount=1,controller.replicaCount=1
# 安装 MongoDB
helm install mongodb bitnami/mongodb --set auth.rootPassword=secret,password=secret
获取连接信息:
echo "Kafka Broker: kafka.default.svc.cluster.local:9092"
echo "MongoDB URI: mongodb://root:secret@mongodb:27017"
Java 代码:实时订单处理
// src/main/java/com/example/spark/StreamingToMongo.java
package com.example.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.HashMap;
import java.util.Map;
public class StreamingToMongo {
// 定义订单类
public static class Order {
public String orderId;
public String product;
public int quantity;
public double price;
public long timestamp;
public Order() {}
public Order(String orderId, String product, int quantity, double price) {
this.orderId = orderId;
this.product = product;
this.quantity = quantity;
this.price = price;
this.timestamp = System.currentTimeMillis();
}
}
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("StreamingToMongoDB")
.getOrCreate();
spark.sparkContext().setLogLevel("WARN");
// 从 Kafka 读取流
Dataset<String> kafkaStream = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "kafka.default.svc.cluster.local:9092")
.option("subscribe", "orders")
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
// 解析 JSON 并转换为 Order 对象
Dataset<Order> orders = kafkaStream.map(json -> {
// 简化处理:假设 JSON 格式为 {"orderId":"O1","product":"Laptop","quantity":1,"price":999.99}
// 实际应使用 Jackson/Gson 解析
Order order = new Order();
// 此处省略完整解析逻辑,仅作示意
return order;
}, Encoders.bean(Order.class));
// 写入 MongoDB
Map<String, String> writeOptions = new HashMap<>();
writeOptions.put("uri", "mongodb://root:secret@mongodb:27017");
writeOptions.put("database", "sales");
writeOptions.put("collection", "orders");
StreamingQuery query = orders.writeStream()
.outputMode("append")
.format("mongodb.spark.sql.DefaultSource")
.options(writeOptions)
.option("checkpointLocation", "/tmp/checkpoint") // 注意:需挂载持久卷
.start();
System.out.println("🚀 Streaming query started. Waiting for data...");
query.awaitTermination();
}
}
📘 学习更多:MongoDB Spark Connector 官方文档
🔹 使用 spark-submit 部署到 Kubernetes
完成代码开发后,即可使用 spark-submit
将任务提交至 K8s 集群。
🚀 提交 WordCount 任务
$SPARK_HOME/bin/spark-submit \
--master k8s://https://$(minikube ip):8443 \
--deploy-mode cluster \
--name spark-wordcount \
--class com.example.spark.WordCountJob \
--conf spark.kubernetes.container.image=yourname/my-spark:v3.5.0 \
--conf spark.kubernetes.authenticate.serviceAccountName=spark-sa \
--conf spark.executor.instances=2 \
--conf spark.executor.memory=1g \
--conf spark.driver.memory=1g \
local:///opt/spark/examples/jars/spark-k8s-demo-1.0-SNAPSHOT.jar \
s3a://my-data-bucket/logs/access.log
⚠️ 注意:
local://
表示 JAR 包已内置在镜像中;- 若 JAR 在本地,需使用
--files
或提前上传;- S3 访问需配置 AWS 凭证(可通过 ConfigMap 或 Secret 注入)。
🔐 注入 AWS 凭证(Secret 方式)
# aws-secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: aws-creds
type: Opaque
data:
AWS_ACCESS_KEY_ID: QUtJQV... # base64 编码
AWS_SECRET_ACCESS_KEY: bnphcGQ=
在 spark-submit
中引用:
--conf spark.kubernetes.authenticate.driver.secret.name=aws-creds \
--conf spark.hadoop.fs.s3a.access.key=$(AWS_ACCESS_KEY_ID) \
--conf spark.hadoop.fs.s3a.secret.key=$(AWS_SECRET_ACCESS_KEY)
🔹 高级配置与性能调优
📊 资源请求与限制
合理设置 CPU 和内存对稳定性至关重要:
--conf spark.driver.memory=2g \
--conf spark.driver.cores=1 \
--conf spark.executor.memory=4g \
--conf spark.executor.cores=2 \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.driver.limit.cores=2 \
--conf spark.kubernetes.executor.limit.cores=3
🧩 Pod 模板定制
使用 podTemplateFile
精细控制 Pod 配置:
# driver-template.yaml
apiVersion: v1
kind: Pod
spec:
containers:
- name: spark-kubernetes-driver
env:
- name: SPARK_LOCAL_DIRS
value: /tmp
- name: JAVA_OPTS
value: "-XX:+UseG1GC"
volumeMounts:
- name: temp-volume
mountPath: /tmp
volumes:
- name: temp-volume
emptyDir: {}
提交时指定:
--conf spark.kubernetes.driver.podTemplateFile=driver-template.yaml \
--conf spark.kubernetes.executor.podTemplateFile=executor-template.yaml
🔄 自动伸缩(Elastic Scaling)
结合 K8s Vertical Pod Autoscaler (VPA) 或自定义 Operator 实现动态调整 Executor 数量。
🔹 监控与日志收集
📈 Prometheus + Grafana 集成
Spark 支持将指标导出为 Prometheus 格式:
--conf spark.metrics.conf.*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet \
--conf spark.metrics.conf.*.sink.prometheusServlet.path=/metrics/prometheus
然后通过 kube-prometheus-stack
采集。
📋 日志查看
# 查看 Driver 日志
kubectl logs -f spark-wordcount-driver
# 查看 Executor 日志
kubectl get pods | grep executor | awk '{print $1}' | xargs -I {} kubectl logs {}
🔹 生产环境最佳实践
- ✅ 使用 Helm Chart 管理 Spark 应用部署;
- ✅ 所有镜像使用 固定版本标签,禁止
latest
; - ✅ 敏感信息通过 Vault 或 Kubernetes Secrets 管理;
- ✅ 启用 Pod Security Policies 或 OPA Gatekeeper;
- ✅ 结合 Argo CD 实现 GitOps 部署;
- ✅ 定期审计 RBAC 权限,遵循最小权限原则。
🔹 总结与展望
Spark on Kubernetes 不仅是一种部署方式的改变,更代表着大数据平台向云原生架构的全面转型。通过本文的实战演练,我们完成了从环境搭建、代码开发到集群部署的全流程,掌握了 Java 应用在 K8s 上运行的关键技能。
未来,随着 Spark History Server on K8s、Adaptive Query Execution、Project Zen 等特性的完善,Spark on K8s 将在性能、可观测性和易用性上持续进化。
🌐 拓展阅读:
拥抱云原生,让大数据更轻盈、更敏捷、更具韧性。🚀
🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨
更多推荐
所有评论(0)