【Hbase】Hbase TableInputFormat、TableOutputFormat
TableInputFormat是Apache HBase中的一个重要的类,它允许MapReduce作业直接从HBase表中读取数据作为其输入。这使得HBase可以作为一个数据源,供MapReduce作业处理其存储的大规模数据集,而无需将数据导出到HDFS或其他文件系统。这样不仅提高了数据处理的效率,还减少了数据传输的延迟和潜在的错误。TableInputFormat的作用TableInputFo
1.概述
2.TableInputFormat
TableInputFormat是Apache HBase中的一个重要的类,它允许MapReduce作业直接从HBase表中读取数据作为其输入。这使得HBase可以作为一个数据源,供MapReduce作业处理其存储的大规模数据集,而无需将数据导出到HDFS或其他文件系统。这样不仅提高了数据处理的效率,还减少了数据传输的延迟和潜在的错误。
TableInputFormat的作用
TableInputFormat的主要作用是将HBase表中的数据转换成适合MapReduce作业处理的形式。它将HBase中的行映射为MapReduce作业中的键值对(<k, v>),其中k通常是一个ImmutableBytesWritable对象,代表HBase行的主键(RowKey),而v是一个Result对象,包含了该行的所有列族、列和版本的信息。
TableInputFormat的配置参数
TableInputFormat可以通过一系列的配置参数来定制扫描行为,这些参数可以通过Job的Configuration对象设置。以下是主要的配置参数及其用途:
-
hbase.mapreduce.inputtable:指定要扫描的HBase表的名称。
-
hbase.mapreduce.scan:可以通过
-
TableMapReduceUtil.convertScanToString(Scan scan)生成的字符串来指定一个Scan对象,从而控制扫描的具体行为。但是由于该方法不公开,一般会通过其他参数间接控制扫描行为。
-
hbase.mapreduce.scan.row.start 和 hbase.mapreduce.scan.row.stop:分别定义扫描的起始RowKey和结束RowKey。
-
hbase.mapreduce.scan.column.family:指定要扫描的列族。
-
hbase.mapreduce.scan.columns:指定要扫描的列,多个列之间用空格分隔。
-
hbase.mapreduce.scan.timestamp:如果设置,将只扫描指定时间戳的数据。
-
hbase.mapreduce.scan.timerange.start 和 hbase.mapreduce.scan.timerange.end:分别定义时间戳范围的开始和结束,以限制扫描的时间范围。
-
hbase.mapreduce.scan.maxversions:定义扫描结果中每列的最大版本数。
-
hbase.mapreduce.scan.cacheblocks:如果设置为true,则在扫描过程中缓存数据块,以提高读取速度。
-
hbase.mapreduce.scan.cachedrows:定义每次读取的最多行数,用于优化读取性能。
-
hbase.mapreduce.scan.batchsize:定义每次读取的最多值的数量,这会影响内存使用和处理速度。
结合Map函数的说明
在MapReduce作业中,map()函数是处理TableInputFormat输出的核心部分。函数签名如下:
Java
public void map(ImmutableBytesWritable row, Result value, Context context)
在这个函数中:
-
row:类型为ImmutableBytesWritable,代表当前处理行的RowKey。
-
value:类型为Result,包含了当前行的所有列族、列和版本的数据。
-
context:类型为Context,用于向Reducer发送键值对或写入日志等。
在map()函数内部,你可以根据value中的数据进行各种处理,如过滤、聚合等,然后通过context.write()将处理后的结果发送给Reducer。
3.TableOutputFormat
TableOutputFormat是Apache HBase提供的一个用于将MapReduce作业的输出直接写入HBase表的类。与TableInputFormat相对应,TableOutputFormat使得MapReduce作业能够将处理后的结果直接存储回HBase,而无需先写入HDFS再导入HBase,从而简化了数据流并提高了效率。
TableOutputFormat的作用
TableOutputFormat的主要功能是在MapReduce作业完成时,将MapReduce作业的输出数据写回到HBase表中。它接收的输出数据类型是<KEY, VALUE>对,其中VALUE必须是Put或Delete对象。Put对象用于插入或更新HBase表中的行,而Delete对象用于删除HBase表中的行。
TableOutputFormat的配置参数
TableOutputFormat需要通过Job的Configuration对象进行配置,主要的配置参数及其用途包括:
-
hbase.mapred.outputtable:指定写入数据的目的HBase表的名称。
-
hbase.mapred.output.quorum:指定目标HBase表所在的HBase集群的Zookeeper配置信息,格式为:“zookeeper所在机器名(多个实例以逗号分隔):端口号:HBase根节点名”。例如:“zookeeper1.example.com,zookeeper2.example.com:2181:/hbase”。
-
hbase.mapred.output.quorum.port:Zookeeper服务器的端口号,虽然可以通过hbase.mapred.output.quorum中的格式指定,但有时也可以单独配置这个参数。
-
hbase.mapred.output.rs.class 和 hbase.mapred.output.rs.impl:这两个参数用于指定RegionServer的实现类和服务实现,但在实际应用中很少被直接配置,因为默认的实现通常足够满足需求。
使用场景
TableOutputFormat适用于以下场景:
-
数据更新:当MapReduce作业的结果是对现有HBase表的更新时,可以直接使用TableOutputFormat将更新写回表中。
-
数据加载:当需要将大量数据从HDFS或其他数据源批量导入HBase时,可以使用TableOutputFormat将数据直接写入HBase,避免了先导入临时表再进行数据迁移的复杂过程。
-
数据分析:在进行数据分析或数据清洗后,可以直接将处理后的结果写回HBase表,以供后续分析或应用使用。
总之,TableOutputFormat提供了将MapReduce作业的输出直接写入HBase的能力,极大地简化了数据处理流程,并提高了数据处理的效率和灵活性。
3.案例
TableInputFormat和TableOutputFormat的案例
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
public class HBaseTableIOExample {
public static class TestTableMapper extends Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, ImmutableBytesWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
List<Cell> cells = value.listCells();
for (Cell cell : cells) {
// Assuming the family is 'cf' and qualifier is 'qual'
if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), "cf".getBytes())) {
if (Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), "qual".getBytes())) {
context.write(key, new ImmutableBytesWritable(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
}
}
}
}
public static class TestTableReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
byte[] rowKey = key.get();
Put put = new Put(rowKey);
for (ImmutableBytesWritable value : values) {
// Assuming we want to write back to column 'cf:qual'
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qual"), value.get());
}
context.write(new ImmutableBytesWritable(rowKey), NullWritable.get());
context.getCounter("Custom", "ProcessedRows").increment(1);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
Job job = Job.getInstance(conf, "HBaseTableIOExample");
job.setJarByClass(HBaseTableIOExample.class);
// Set up input format
job.setInputFormatClass(TableInputFormat.class);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qual"));
TableMapReduceUtil.initTableMapperJob("test_input", scan, TestTableMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
// Set up output format
job.setOutputFormatClass(TableOutputFormat.class);
TableMapReduceUtil.initTableReducerJob("test_output", TestTableReducer.class, job);
job.setMapperClass(TestTableMapper.class);
job.setReducerClass(TestTableReducer.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
重要点:
-
Mapper: TestTableMapper从输入表test_input读取数据,提取特定的列族和列的值,并将其传递给Reducer。
-
Reducer: TestTableReducer接收来自Mapper的输出,处理数据(本例中为简单传递),并将其写入输出表test_output。
-
Job Setup: 主函数main设置了MapReduce作业的输入和输出格式,以及Mapper和Reducer类,并使用TableInputFormat和TableOutputFormat初始化作业。
确保在运行此示例之前,你已经在HBase中创建了test_input和test_output表,并且test_input表中包含了适当的数据。此外,你可能需要根据你的环境调整HBase和Hadoop的配置。
更多推荐
所有评论(0)