在这里插入图片描述
在这里插入图片描述

Spark集群搭建

一、基础集群脚本

1、远程调用脚本(remote_call.sh)

如果有传命令参数,则执行该命令;如果没有传命令参数,则不执行。

#!/bin/bash

cmd=$1
if [ ! $cmd ];then
        cmd="jps"
fi

# 提取集群免密通信的虚拟机主机名
hosts=`sed -n '3,$p' /etc/hosts | awk '{print $2}'`

# 遍历所有主机
for host in $hosts;do
        echo "-------- $host --------"
        # 此处使用"$cmd"的原因是避免将命令中的空格识别为多条命令
        ssh root@$host "$cmd"
done
2、远程复制目录脚本(remote_copy.sh)

首先,要验证待复制的目录在本机是否存在;然后需要从/etc/hosts文件中获取除去当前主机名的其他主机名,并且对每个主机进行循环操作,先判断父目录是否存在,再进行递归复制。

#!/bin/bash

path=$1 # 本机目录

# 验证路径是否存在
if [ ! -e $path ];then
        echo "目录 $path 不存在,无法拷贝"
        exit 0
fi

# 提取集群中排除当前主机名之外的所有主机名
# 只需验证父目录是否存在即可
me=`hostname`
parent=`dirname $path`
hosts=`sed -n '3,$p' /etc/hosts | awk '{print $2}' | grep -v "$me"`

for host in $hosts;do
        scp -r $path root@$host:$parent
        echo "-------- 拷贝 $path$host 成功 --------"
done
3、Spark集群服务管理脚本

按照"先开的后关"的原则,按照"ZooKeeper-HDFS-YARN-Spark-启用其他Master节点"的次序进行集群的服务管理

#!/bin/bash

if [ "$1" == "start" ];then
        # 自定义启动服务函数,传入两个字符串参数:参数1为启动命令,参数2为启动方式
    function startService(){
                # 定义两个局部遍历依次接收用户传入的两个参数
                # service 为启动命令
                local service=$1
                # exeType 为启动方式(daemon:后台启动(用于那些会占用窗口的服务),非daemon表示正常启动)
                local exeType=$2
                if [ "$exeType" == "daemon" ];then
                        # 后台启动
                        nohup $service 1>/dev/null 2>&1 &
                else
                        # 正常启动
                        $service 1>/dev/null 2>&1
                fi
                # 判断执行启动脚本后的状态
                if [ $? -ne 0 ];then
                        # 因为服务按从上到下有依赖关系,所以任何一个脚本执行出错,将退出整个脚本,所以选择exit而非return
                        # 函数按通用规则 return 0:正常,非0:表示异常;若函数中return缺省,则默认返回最后一条命名的状态即改命令执行后的$?
                        echo "ERROR : FAIL TO EXECUTE $service IN MODE OF $exeType ... EXIT ..."
                        # 退出脚本
                        exit 1
                else
                        # 成功则正常输出提示结果
                        echo "INFO : SUCCESS TO EXECUTE $service IN MODE OF $exeType"
                fi
    }

        # 依次按服务依赖顺序调用启动服务函数,并为每次调用传入启动命令和启动方式
        # 启动 Zookeeper
        /root/install/remote_call.sh "zkServer.sh start"
        # 启动 hadoop hdfs
    startService "start-dfs.sh" "non-daemon"
        # 启动 hadoop yarn
    startService "start-yarn.sh" "non-daemon"
        # 启动 Spark
        startService "start-all.sh" "non-daemon"
        # 实现集群高可用
        ssh root@master02 "start-master.sh"

elif [ "$1" == "stop" ];then
        # 自定义启动服务函数,传入两个字符串参数:参数1为关闭命令,参数2为关闭方式
    function stopService(){
                # 服务名称或关闭命令
                local service=$1
                # 关闭方式(command:命令关闭,pid:根据服务查找进程编号(pid)在借助kill命令关闭)
                local exeType=$2
                if [ "$exeType" == "command" ];then
                        # 直接执行参数一命令关闭服务
                        $service 1>/dev/null 2>&1
                        # 根据关闭命令执行的状态展示结果
                        if [ $? -eq  0 ];then
                                        echo "INFO : SUCCESS TO EXECUTE $service"
                        else
                                        echo "ERROR : FAIL TO EXECUTE $service"
                        fi
                else
                        # 根据参数一传入的服务名称查看进程编号(pid)
                        local pid=$(jps -ml|grep $service|awk '{print $1}')
                        if [ "$pid" ];then
                                # 如果进程编号存在,则直接强制 kill
                                kill -9 $pid
                                # 根据kill的状态展示结果
                                if [ $? -eq  0 ];then
                                                        echo "INFO : SUCCESS TO KILL $service WITH PID $pid"
                                        else
                                                        echo "ERROR : FAIL TO KILL $service WITH PID $pid"
                                        fi
                        else
                                echo "INFO : NO SERVICE EXIST WITH NAME OF $service"
                        fi
                fi
    }

        # 根据服务的依赖关系,逆向逐个关闭服务
        # 关闭高可用另外起的master
        ssh root@master02 "stop-master.sh"
        # 关闭Spark
        stopService "stop-all.sh" "command"
        # 最后关闭 yarn 和 hdfs
    stopService "stop-yarn.sh" "command"
    stopService "stop-dfs.sh" "command"
        # 关闭ZooKeeper
        /root/install/remote_call.sh "zkServer.sh stop"
else
        # 附带查一下java进程
    jps -ml
fi

如果需要切换主节点的状态,可以先通过stop-master.sh使A-Master节点状态由Active->Standby,待B-Master根据机制切换为Active态之后再重启A-Master节点。

二、配置相关文件

cd /opt/software/spark-3.1.2/conf
// mv xxx.template xxx 将所有template文件去除后缀为正式配置文件
vim spark-env.sh
-------------------------------------------------------------------
export JAVA_HOME=/opt/software/jdk1.8.0_171

export HADOOP_CONF_DIR=/opt/software/hadoop-3.1.3/etc/hadoop

SPARK_MASTER_WEBUI_PORT=9090

SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=9091
-Dspark.history.fs.logDirectory=hdfs://master01:8020/spark_event_log_dir
-Dspark.history.retainedApplications=30
"

SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=master01,master02,worker01
-Dspark.deploy.zookeeper.dir=/spark
"
-------------------------------------------------------------------

vim workers // 配置集群所有机器名称
-------------------------------------------------------------------
master01
master02
worker01
-------------------------------------------------------------------

vim spark-default.conf
-------------------------------------------------------------------
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://master01:8020/spark_event_log_dir
spark.yarn.historyServer.address=master01:9091
spark.history.ui.port=9091
-------------------------------------------------------------------

vim yarn-site.xml // [选配]:在启动Yarn模式时才需要配置
-------------------------------------------------------------------
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>
-------------------------------------------------------------------

./remote_copy.sh /opt/software/spark-3.1.2/ // 拷贝Spark内容
./remote_copy.sh /etc/profile.d/myenv.sh // 拷贝环境变量
./remote_call.sh "source /etc/profile"
./remote_call.sh "/opt/software/service-bigdata.sh start" // 完整启动Spark集群中的所有服务

在此配置下,有以下关键信息:

  • 端口配置
    • Spark Master Web UI:9090
    • Spark HistoryServer UI:9091
  • 日志文件路径
    • 日志文件路径:hdfs://master01:8020/spark_event_log_dir
  • 关闭内存检查的原因:Spark任务吃资源,不能为了保证资源不被过度使用就强行终止Spark任务。

三、任务提交案例

1. 案例详解

ProductsAnalyzer类

val path = args(0) val dirPath = args(1) 表示输入和输出路径在提交任务时具体指定。

package envtest

import core.{SparkSessionBuilder, SparkSessionBuilderDeployer}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Window

object ProductsAnalyzer {
  case class Product(name: String, price: Double, date: String, market: String, province: String, city: String)
  implicit class StrExt(line: String){
    // 香菜	2.80	2018/1/1	山西汾阳市晋阳农副产品批发市场	山西	汾阳
    val regexProduct = "(.*?)\t(.*?)\t(.*?)\t(.*?)\t(.*?)\t(.*?)".r
    def toProduct: Product = {
      line match {
        // 使用模式匹配将字符串和正则表达式进行匹配,如果成功,则将各个匹配到的部分转换为Product对象并进行返回
        // java.lang.NumberFormatException: empty String:输入数据中的某些行可能不符合预期的格式,导致 toDouble 转换失败。解决方案是:使其转化为Product对象时,如果为空,则转化为0.0f
        case regexProduct(name, price, date, market, province, city) => Product(name, if (price.trim.isEmpty) 0.0f else price.toDouble, date, market, province, city)
        case _ => throw new RuntimeException(s"产品数据格式有误:$line")
      }
    }
  }
  def main(args: Array[String]): Unit = {
    /**
     * 在Edit Configurations中指定参数,通过args[0]和args[1]获取输入路径和输出路径
     */
    val path = args(0)
    val dirPath = args(1)

    val ssb: SparkSessionBuilderDeployer = SparkSessionBuilderDeployer()
    val spark = ssb.spark
    val sc: SparkContext = ssb.sc

    val rddProduct: RDD[Product] = sc.textFile(path)
      .mapPartitions(_.map(_.toProduct))

    import org.apache.spark.sql.functions._
    import spark.implicits._
    // 统计每个省份下每个商品的平均价格和商品数量,并且在不同省份下按照商品数量创建排名指标
    spark.createDataFrame(rddProduct)
      .groupBy($"province",$"name")
      .agg(
        avg($"price").as("avg_price"),
        count("*").as("product_cnt")
      )
      .select($"province",$"name",$"avg_price",$"product_cnt",dense_rank().over(Window.partitionBy($"province").orderBy($"avg_price".desc)).as("rank"))
      .repartition(1)
      .write
      .mode(saveMode = "overwrite")
      .option("separator",",")
      .option("header", "true")
      .csv(dirPath)

    spark.stop()
  }
}

SparkSessionBuilderDeployer类

SparkSessionBuilderDeployer() 使任务提交案例关于运行模式和名称的配置在任务提交时再进行设置,提升案例的重用性。

package core

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

// 封装SparkSession的创建方法
// SparkSessionBuilderDeployer() 使任务提交案例关于运行模式和名称的配置在任务提交时再进行设置,提升案例的重用性。
class SparkSessionBuilderDeployer(){
  lazy val config:SparkConf = {
    new SparkConf()
  }

  lazy val spark:SparkSession = {
    SparkSession.builder()
      .config(config)
      .getOrCreate()
  }

  lazy val sc:SparkContext = {
    spark.sparkContext
  }

  def stop(): Unit = {
    if (null != spark) {
      spark.stop()
    }
  }
}
object SparkSessionBuilderDeployer {
  def apply(): SparkSessionBuilderDeployer = new SparkSessionBuilderDeployer
}

在具体测试案例代码时,可以先通过对pathdirpath指定具体路径的方式来测试整体逻辑,再通过Edit Configurations设置具体参数来测试传参后的程序逻辑,测试完毕之后,即可直接将输入和输出路径分别修改为args(0)args(1),并且直接打包发布到虚拟机,供给Spark集群进行执行。

2. Spark Submit 选项详解
  • 主要选项

    • –class :

      • 指定应用程序的主类,即包含 main 方法的类。
      • 示例:--class envtest.ProductsAnalyzer
    • –master :

      • 指定 Spark Master 节点的 URL。

      • 示例:

        • Standalone 模式:

          --master spark://master01:7077
          
          • 表示 Spark Master 节点运行在 master01 主机的 7077 端口上。
        • YARN 模式:

          --master yarn
          
          • 使用 YARN 作为资源管理器。
    • –deploy-mode :

      • 指定应用程序的运行模式。
      • 模式:
        • client:Driver 程序运行在提交任务的本地机器上。
        • cluster:Driver 程序运行在集群的某个节点上。
      • 示例:--deploy-mode cluster
    • –name :

      • 指定应用程序的名称。
      • 示例:--name spark-sql-product
    • :

      • Spark 应用程序的 JAR 文件的路径,包含了主类及其依赖。
      • 示例:/root/spark/spark_sql-1.0.jar
    • <application-argument(s)>:

      • 应用程序的输入参数。
      • 示例:
        • 输入路径:hdfs://master02:8020/spark/data/products.txt
        • 输出路径:hdfs://master02:8020/spark/result/product_result
  • 具体代码

standalone

spark-submit \
--class envtest.ProductsAnalyzer \
--master spark://master01:7077 \
--deploy-mode client \
--name spark-sql-product \
/root/spark/spark_sql-1.0.jar \
hdfs://master02:8020/spark/data/products.txt \
hdfs://master02:8020/spark/result/product_result

yarn-cluster

spark-submit \
--class envtest.ProductsAnalyzer \
--master yarn \
--deploy-mode cluster \
--name spark-sql-product \
/root/spark/spark_sql-1.0.jar \
hdfs://master02:8020/spark/data/products.txt \
hdfs://master02:8020/spark/result/product_result

在这里插入图片描述

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐