🐇明明跟你说过:个人主页

🏅个人专栏:《大数据前沿:技术与应用并进》🏅

🔖行路有良友,便是天堂🔖

目录

一、引言

1、Kafka简介

2、Hadoop简介

二、Kafka基础

1、Kafka核心概念

2、Kafka核心组件

3、Kafka主要特性

4、Kafka使用场景

三、Hadoop生态系统概览

1、Hadoop核心组件

2、Hive 

3、HBase

4、Hadoop在大数据处理中的应用场景

1. 数据存储与管理

2. 数据处理与分析

3. 数据仓库与查询

4. 流数据处理

四、Kafka与Hadoop集成的必要性

1、集成的优势

2、实际应用场景

五、Kafka与Hadoop集成案例

1、使用Logstash从Kafka到Hadoop的数据传输

1. 环境准备

2. 创建topic

3. 创建logstash配置文件

4. 安装HDFS插件 

5. 启动logstash

2、Apache Spark作为中间层:从Kafka读取数据并写入Hadoop

1. 从 Kafka 读取数据

2. 数据处理

3. 写入 Hadoop HDFS


一、引言

1、Kafka简介

Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,并于 2011 年开源,现在由 Apache Software Foundation 进行维护。Kafka 旨在提供一个统一、高吞吐量、低延迟的平台,用于处理实时数据流。它通常用于构建实时数据管道和流式应用。

更多 Kafka 介绍请参考《大数据领域的重要基础设施——Kafka入门篇》 

2、Hadoop简介

Apache Hadoop 是一个开源的分布式计算框架,由 Apache 软件基金会开发和维护。它用于处理和存储大规模数据集,通常被称为“大数据”。Hadoop 的设计目标是提供一个可靠、可扩展和高效的平台,用于分布式数据处理。

更多 Hadoop 介绍请参考《【Hadoop】核心组件深度剖析:HDFS、YARN与MapReduce的奥秘》 

二、Kafka基础

1、Kafka核心概念

  1. Producer: 生产者是将数据发布到 Kafka 主题中的客户端应用程序。生产者负责将数据发送到 Kafka 集群。
  2. Consumer: 消费者是从 Kafka 主题中读取数据的客户端应用程序。消费者订阅一个或多个主题,并从中消费数据。
  3. Broker: Kafka 集群由多个 Kafka 实例(称为 broker)组成,每个 broker 负责处理和存储一部分数据。Broker 之间通过分区和副本机制实现数据的分布式存储和高可用性。
  4. Topic: 主题是 Kafka 中的消息分类或类别。生产者将消息发布到主题,消费者从主题中读取消息。每个主题可以分为多个分区(partition),以实现并行处理和扩展。
  5. Partition: 分区是 Kafka 主题的基本单元,每个主题可以包含一个或多个分区。每个分区是一个有序的、不可变的消息队列。分区有助于实现数据的并行处理和负载均衡。
  6. Offset: 每条消息在其所在的分区中都有一个唯一的标识符,称为偏移量(offset)。消费者使用偏移量来跟踪已经消费的消息位置。

 

2、Kafka核心组件

  1. Kafka Connect: Kafka Connect 是 Kafka 的一个组件,用于简化将数据从外部系统(例如数据库、文件系统等)导入和导出到 Kafka 的过程。它提供了许多预构建的连接器,可以轻松集成各种数据源和目标。
  2. Kafka Streams: Kafka Streams 是一个用于构建流处理应用程序的客户端库。它允许开发者创建高度可扩展、容错的流处理应用程序,以便实时处理和分析数据流。
  3. ZooKeeper: Kafka 使用 Apache ZooKeeper 进行分布式协调,管理集群的元数据,包括主题、分区、broker 等信息。

 

3、Kafka主要特性

  1. 高吞吐量: Kafka 可以处理大量的数据流,并且在低延迟下提供高吞吐量的消息传输。
  2. 可扩展性: 通过增加更多的 broker,可以轻松扩展 Kafka 集群的容量和性能。
  3. 持久性和容错性: Kafka 将数据持久化到磁盘,并通过复制机制实现高可用性,确保在硬件故障时数据不会丢失。
  4. 分区和并行处理: 通过将主题划分为多个分区,Kafka 支持高效的并行处理,从而提高数据处理的速度和效率。

4、Kafka使用场景

  1. 实时流数据管道: Kafka 通常用于构建实时数据管道,将数据从生产者传输到消费者。
  2. 数据集成: 使用 Kafka Connect,将不同数据源的数据集成到统一的 Kafka 平台。
  3. 实时分析和监控: 利用 Kafka Streams 或其他流处理框架,可以对实时数据进行分析和监控。

三、Hadoop生态系统概览

1、Hadoop核心组件

1. Hadoop 分布式文件系统 (HDFS):

  • HDFS 是 Hadoop 的存储层,专为大规模数据存储而设计。
  • 它将数据分成块(通常为 128 MB 或 256 MB),并在集群中的多个节点上进行复制和存储,以确保数据的高可用性和容错性。
  • HDFS 的主从架构包括一个 NameNode(管理文件系统的元数据)和多个 DataNode(存储实际数据)。

2. MapReduce:

  • MapReduce 是 Hadoop 的计算模型和处理引擎,用于大规模数据处理。
  • 它将计算任务分成两个阶段:Map 阶段和 Reduce 阶段。Map 阶段处理输入数据并生成中间结果,Reduce 阶段汇总中间结果并生成最终输出。
  • MapReduce 编程模型易于扩展,可以在数千个节点上并行处理数据。

3. Yet Another Resource Negotiator (YARN):

  • YARN 是 Hadoop 的资源管理层,用于集群资源的管理和调度。
  • 它分离了资源管理和作业调度,提供了更好的集群资源利用率和灵活性。
  • YARN 允许多种数据处理框架(如 MapReduce、Spark、Tez)在同一个 Hadoop 集群上运行。

4. Hadoop Common:

  • Hadoop Common 包含 Hadoop 框架使用的通用实用工具和库,支持其他 Hadoop 模块的开发和操作。

2、Hive 

Apache Hive 是一个基于 Hadoop 的数据仓库软件,用于处理和查询存储在 Hadoop 分布式文件系统(HDFS)中的大规模数据集。它提供了一种类似 SQL 的查询语言,称为 HiveQL,用于数据分析和处理。

1. 简介

  • Hive 由 Facebook 开发,随后成为 Apache 软件基金会的顶级项目。它旨在让熟悉 SQL 的用户能够轻松在 Hadoop 上进行数据处理,而不需要编写复杂的 MapReduce 代码。

2. 工作原理

  1. 查询解析:用户通过 CLI、Web UI 或 Thrift 服务提交 HiveQL 查询。
  2. 查询编译:查询由 Driver 解析,并由 Compiler 编译为执行计划。
  3. 优化执行计划:执行计划经过优化,以提高查询效率。
  4. 执行查询:优化后的执行计划提交给执行引擎(如 MapReduce、Tez 或 Spark)执行。
  5. 返回结果:执行结果通过 Driver 返回给用户。

3. 使用场景

  • 数据仓库:Hive 主要用于构建数据仓库,支持复杂的数据分析和处理任务。
  • 数据分析:Hive 可以进行大规模数据分析,适用于日志分析、业务报告等。
  • ETL 处理:Hive 支持数据的抽取、转换和加载(ETL)过程,可以将不同来源的数据整合并进行处理。

3、HBase

Apache HBase 是一个开源的、分布式的、面向列的数据库,运行在 Hadoop 之上。它旨在提供对大规模结构化数据的随机、实时读写访问,类似于 Google 的 Bigtable。

HBase 是一个非关系型数据库(NoSQL),使用 HDFS 作为底层存储,适用于存储和处理大规模的稀疏表。它为 Hadoop 提供了一个高可靠性、高性能、高伸缩性的数据库服务。

1. 工作原理

  1. 数据写入:数据首先写入内存中的 MemStore,然后异步地写入 HDFS 中的 HFile。数据写入时也会记录到 Write-Ahead Log(WAL)以保证数据的可靠性。
  2. 数据读取:数据从内存(MemStore)和 HDFS 中的 HFile 读取,读取时会根据行键定位到相应的 RegionServer,再由 RegionServer 进行数据检索。
  3. 分区管理:当表的数据量增加到一定程度时,Region 会进行分裂(Split),从而将数据分布到更多的 RegionServer 上以均衡负载。

2. 使用场景

  • 实时分析:适用于需要低延迟、大吞吐量的数据访问场景,如实时数据分析和处理。
  • 数据存储:用于存储大规模的结构化和半结构化数据,如物联网数据、日志数据等。
  • 社交网络:管理社交网络数据,处理用户关系和消息流。
  • 时序数据:存储和查询时序数据,如监控数据、传感器数据等。

   

4、Hadoop在大数据处理中的应用场景

1. 数据存储与管理

Hadoop 分布式文件系统 (HDFS) 提供了一个高可靠性、高可扩展性和高容错的数据存储系统,适用于存储海量数据。

  • 海量数据存储:能够存储和管理 PB 级的数据,适用于需要存储大规模、结构化和非结构化数据的应用场景,如企业日志、社交媒体数据、传感器数据等。
  • 分布式数据管理:利用 HDFS 可以将数据分布在集群中的多台机器上,提高数据存储和管理的效率和可靠性。

2. 数据处理与分析

MapReduce 是 Hadoop 的核心组件之一,提供了一种分布式数据处理模型,适用于大规模数据处理和分析。

  • 大数据处理:能够处理 TB 级到 PB 级的数据,广泛应用于大数据分析、数据挖掘、机器学习等领域。
  • 批处理:MapReduce 适合处理需要批量处理的任务,如日志处理、网页索引、图像处理等。

   

3. 数据仓库与查询

Hive 提供了一个基于 Hadoop 的数据仓库解决方案,可以使用类似 SQL 的查询语言 (HiveQL) 对存储在 HDFS 上的数据进行查询和分析。

  • 数据仓库:适用于构建大规模数据仓库,用于存储和管理企业的大量历史数据。
  • 数据查询和分析:用户可以使用 HiveQL 进行复杂的数据查询和分析,而无需了解底层的 MapReduce 实现。

4. 流数据处理

Hadoop 与流处理框架(如 Apache Storm、Apache Flink)集成,提供了实时数据处理能力。

  • 实时数据处理:适用于需要实时处理和分析的数据场景,如实时日志分析、在线交易监控、社交媒体数据分析等。
  • 事件驱动处理:处理实时流数据中的事件,能够快速响应数据变化和事件触发。

     

四、Kafka与Hadoop集成的必要性

1、集成的优势

1. 实时与离线处理结合:

  • 混合处理架构:通过将 Kafka 和 Hadoop 集成,可以构建一个既能处理实时数据流,又能进行批量数据分析的混合处理架构。Kafka 负责实时数据流的处理,Hadoop 负责离线数据的存储和批处理。
  • 数据流的持久化:实时数据流通过 Kafka 进入系统后,可以定期将数据导入 Hadoop 中进行持久化和深度分析。

2. 扩展性和灵活性:

  • 高扩展性:Kafka 和 Hadoop 都具有高扩展性,能够处理大规模的数据。Kafka 可以处理百万级别的消息吞吐量,Hadoop 可以处理 PB 级别的数据存储和分析。
  • 灵活的数据管道:通过 Kafka,可以灵活地构建数据管道,将数据从生产者传输到消费者,并最终导入 Hadoop 中进行处理。

3. 简化数据集成和管理:

  • 统一的数据平台:集成 Kafka 和 Hadoop 可以构建一个统一的数据平台,简化数据的管理和集成。Kafka 负责实时数据的传输,Hadoop 负责数据的存储和批处理,形成一个完整的数据处理链条。
  • 数据一致性:通过统一的数据平台,可以确保数据的一致性和完整性,从而提高数据处理的效率和质量。

   

2、实际应用场景

1. 日志收集和分析:

  • 实时日志收集:通过 Kafka 收集各类系统日志、应用日志等,实现实时的日志监控和处理。
  • 批量日志分析:将日志数据定期导入 Hadoop 中,通过 MapReduce 或者 Hive 进行离线的日志分析,挖掘日志中的有价值信息。

2. 实时数据分析:

  • 实时流处理:使用 Kafka 和流处理框架(如 Apache Storm、Apache Flink)进行实时数据分析和处理,如实时报警、实时推荐等。
  • 离线数据存储和分析:将实时处理后的数据存储在 HDFS 中,通过 Hadoop 进行离线的数据分析和挖掘,提供长期的历史数据分析能力。

   

五、Kafka与Hadoop集成案例

1、使用Logstash从Kafka到Hadoop的数据传输

使用 Logstash 从 Kafka 到 Hadoop 的数据传输是一个常见的场景,能够实现实时数据流处理和批量数据存储分析的结合。下面是本次案例的流量示意图

1. 环境准备

 安装 Logstash:确保在你的系统上已经安装了 Logstash。

Kafka 集群:确保Kafka 集群正常运行,并且能够从中消费消息。

Hadoop 环境:确保 HDFS 正常运行,可以将数据写入 HDFS。 

2. 创建topic

登录到Kafka服务器,进入到Kafka的bin目录下,输入如下命令创建一个topic

./kafka-topics.sh --bootstrap-server kafka0:9092 --topic test  --create

3. 创建logstash配置文件

进入到logstash的conf目录下,创建配置文件 test.conf 

input{
  kafka {
    bootstrap_servers => "192.168.40.100:9092,192.168.40.101:9092,192.168.40.102:9092"
    topics => ["test"]
    group_id => "logstashGroup"
    codec => "json"
  }
}

output {
  hdfs {
    path => "hdfs://192.168.40.130:8020/user/logs/%{+YYYY}/%{+MM}/%{+dd}/%{+HH}/logstash-%{+YYYY-MM-dd-HH}.log"
    codec => "json_lines"
    idle_flush_time => 10
  }
}
  • bootstrap_servers:指定 Kafka 集群的地址和端口。Logstash 会连接到这些 Kafka broker 来消费数据。这里提供了三个 Kafka broker 的地址,确保 Kafka 集群的高可用性。
  • topics:指定要消费的 Kafka 主题。在此配置中,test 是要从 Kafka 消费的主题名。Logstash 会从此主题获取消息。
  • group_id:定义 Kafka 消费者组 ID。多个 Logstash 实例如果使用相同的 group_id,它们将共享消费数据,从而避免重复消费。每个消费者组中的每个消费者会处理 Kafka 分区中的一部分数据。
  • codec:json 编解码器指定 Logstash 会把接收到的 Kafka 消息解析为 JSON 格式。这意味着 Kafka 中的每条消息必须是有效的 JSON 格式数据。
  • path:定义了输出日志的路径。这个路径指向 Hadoop HDFS 的一个位置,其中的 %{+YYYY}, %{+MM}, %{+dd}, %{+HH} 是动态替换的时间格式标记。日志文件会根据当前日期和小时被分区和命名。例如,文件路径可能是 hdfs://192.168.40.130:8020/user/logs/2024/11/18/14/logstash-2024-11-18-14.log,每小时生成一个日志文件。
  • hdfs://192.168.40.130:8020 是 HDFS 的 URI 和端口。
  • %{+YYYY}, %{+MM}, %{+dd}, %{+HH} 是时间格式化标记,Logstash 会根据事件发生的时间动态填充这些字段。
  • codec:json_lines 指定输出数据的编码格式为 JSON 行格式(每一行是一个独立的 JSON 对象)。这种格式适合处理大规模的日志文件,因为它支持逐行解析和处理。
  • idle_flush_time:指定 Logstash 在没有新数据到来时,等待的时间(以秒为单位)在将数据写入 HDFS 前进行刷新。这里设置为 10 秒,意味着如果 10 秒内没有新的数据,Logstash 会将已经积累的数据写入 HDFS。

   

4. 安装HDFS插件 

进入到logstash的bin目录下,执行如下命令安装hdfs插件

./logstash-plugin install logstash-output-hdfs

5. 启动logstash

进入到logstash的bin目录下,执行如下命令启动logstash

./logstash -f /opt/logstash/config/test.conf

2、Apache Spark作为中间层:从Kafka读取数据并写入Hadoop

使用 Apache Spark 作为中间层,从 Kafka 读取数据并将其写入 Hadoop,是处理大规模数据流的常见架构。这种方法通常用于实时数据处理和存储,尤其适用于需要高吞吐量和低延迟的数据流平台。

1. 从 Kafka 读取数据

Spark 提供了一个专门的连接器,可以从 Kafka 读取实时流数据。你可以使用 Structured Streaming 来读取 Kafka 中的数据流,并将其转化为 Spark 的数据帧(DataFrame)或数据集(Dataset)。配置 Kafka 读取源,以下是一个python示例

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("KafkaToHadoop") \
    .getOrCreate()

# 从 Kafka 中读取数据
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "your_topic") \
    .load()

# 转换数据
kafka_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

在上述代码中,readStream 使得我们可以从 Kafka 消费消息流,并通过 option("subscribe", "your_topic") 来指定要订阅的 Kafka 主题。

2. 数据处理

在 Spark 中,我们可以对从 Kafka 获取的数据流进行实时处理。Spark 提供了强大的数据处理能力,可以对流数据进行清洗、转换、聚合等操作。

processed_df = kafka_df.select("key", "value")
# 可以进行更多的数据转换或处理


3. 写入 Hadoop HDFS

Spark 也提供了对 Hadoop HDFS 的支持,可以将处理后的数据写入 HDFS。我们可以选择批处理模式或流处理模式,根据需求来选择合适的方式。

配置写入 HDFS

processed_df \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .option("path", "hdfs://namenode_host:8020/user/hadoop/logs/") \
    .start()


在这里:

  • outputMode("append") 表示数据是以追加的方式写入 HDFS。
  • 使用 Parquet 格式是因为它支持高效的数据压缩和列式存储,非常适合大数据处理。
  • checkpointLocation 是存储流处理检查点信息的路径,确保数据的一致性和容错。

   

 💕💕💕每一次的分享都是一次成长的旅程,感谢您的陪伴和关注。希望这些关于大数据的文章能陪伴您走过技术的一段旅程,共同见证成长和进步!😺😺😺

🧨🧨🧨让我们一起在技术的海洋中探索前行,共同书写美好的未来!!!   

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐