Skip to content

Commit f7d0b37

Browse files
author
Hernan Gelaf-Romer
committed
HBASE-28440: Add support for using mapreduce sort in HFileOutputFormat2 (not yet merged upstream)
1 parent 6690644 commit f7d0b37

File tree

8 files changed

+186
-15
lines changed

8 files changed

+186
-15
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
346346
LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
347347
}
348348
conf.set(JOB_NAME_CONF_KEY, jobname);
349+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
349350

350351
BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
351352
int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
@@ -358,6 +359,7 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
358359
+ " finished.");
359360
} finally {
360361
deleteBulkLoadDirectory();
362+
conf.unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY);
361363
}
362364
}
363365

@@ -411,6 +413,7 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
411413
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
412414
conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true);
413415
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
416+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
414417
conf.set(JOB_NAME_CONF_KEY, jobname);
415418

416419
// Rack-aware WAL processing configuration is set directly via command line to the same key
@@ -423,6 +426,7 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
423426
if (result != 0) {
424427
throw new IOException("WAL Player failed");
425428
}
429+
conf.unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY);
426430
conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
427431
conf.unset(JOB_NAME_CONF_KEY);
428432
} catch (IOException e) {

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,14 @@
3333
import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
3434
import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
3535
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
36+
import org.apache.hadoop.hbase.mapreduce.Import;
37+
import org.apache.hadoop.hbase.mapreduce.PreSortedCellsReducer;
3638
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
3739
import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
3840
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
3941
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
4042
import org.apache.hadoop.io.NullWritable;
43+
import org.apache.hadoop.io.WritableComparable;
4144
import org.apache.hadoop.mapreduce.Job;
4245
import org.apache.hadoop.mapreduce.Mapper;
4346
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -71,18 +74,28 @@ protected MapReduceHFileSplitterJob(final Configuration c) {
7174
/**
7275
* A mapper that just writes out cells. This one can be used together with {@link CellSortReducer}
7376
*/
74-
static class HFileCellMapper extends Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
77+
static class HFileCellMapper extends Mapper<NullWritable, Cell, WritableComparable<?>, Cell> {
78+
79+
private boolean diskBasedSortingEnabled = false;
7580

7681
@Override
7782
public void map(NullWritable key, Cell value, Context context)
7883
throws IOException, InterruptedException {
79-
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)),
80-
new MapReduceExtendedCell(value));
84+
context.write(wrap(value), new MapReduceExtendedCell(value));
8185
}
8286

8387
@Override
8488
public void setup(Context context) throws IOException {
8589
// do nothing
90+
diskBasedSortingEnabled =
91+
HFileOutputFormat2.diskBasedSortingEnabled(context.getConfiguration());
92+
}
93+
94+
private WritableComparable<?> wrap(Cell cell) {
95+
if (diskBasedSortingEnabled) {
96+
return new Import.CellWritableComparable(cell);
97+
}
98+
return new ImmutableBytesWritable(CellUtil.cloneRow(cell));
8699
}
87100
}
88101

@@ -109,14 +122,23 @@ public Job createSubmittableJob(String[] args) throws IOException {
109122
// Use standard HFileInputFormat which now supports location resolver automatically
110123
// HFileInputFormat will automatically detect and log rack-awareness configuration
111124
job.setInputFormatClass(HFileInputFormat.class);
112-
113-
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
114125
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
126+
boolean diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf);
127+
if (diskBasedSortingEnabled) {
128+
job.setMapOutputKeyClass(Import.CellWritableComparable.class);
129+
job.setSortComparatorClass(Import.CellWritableComparable.CellWritableComparator.class);
130+
} else {
131+
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
132+
}
115133
if (hfileOutPath != null) {
116134
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
117135
TableName tableName = TableName.valueOf(tabName);
118136
job.setMapperClass(HFileCellMapper.class);
119-
job.setReducerClass(CellSortReducer.class);
137+
if (diskBasedSortingEnabled) {
138+
job.setReducerClass(PreSortedCellsReducer.class);
139+
} else {
140+
job.setReducerClass(CellSortReducer.class);
141+
}
120142
Path outputDir = new Path(hfileOutPath);
121143
FileOutputFormat.setOutputPath(job, outputDir);
122144
job.setMapOutputValueClass(MapReduceExtendedCell.class);

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hadoop.hbase.HRegionLocation;
5151
import org.apache.hadoop.hbase.HTableDescriptor;
5252
import org.apache.hadoop.hbase.KeyValue;
53+
import org.apache.hadoop.hbase.KeyValueUtil;
5354
import org.apache.hadoop.hbase.PrivateCellUtil;
5455
import org.apache.hadoop.hbase.TableName;
5556
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -83,6 +84,7 @@
8384
import org.apache.hadoop.io.NullWritable;
8485
import org.apache.hadoop.io.SequenceFile;
8586
import org.apache.hadoop.io.Text;
87+
import org.apache.hadoop.io.Writable;
8688
import org.apache.hadoop.mapreduce.Job;
8789
import org.apache.hadoop.mapreduce.OutputCommitter;
8890
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -194,6 +196,11 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
194196
"hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
195197
static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
196198

199+
@InterfaceAudience.Private
200+
public static final String DISK_BASED_SORTING_ENABLED_KEY =
201+
"hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled";
202+
private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false;
203+
197204
public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
198205
public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
199206
REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
@@ -579,12 +586,19 @@ private static void writePartitions(Configuration conf, Path partitionsPath,
579586

580587
// Write the actual file
581588
FileSystem fs = partitionsPath.getFileSystem(conf);
582-
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath,
583-
ImmutableBytesWritable.class, NullWritable.class);
589+
boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf);
590+
Class<? extends Writable> keyClass =
591+
diskBasedSortingEnabled ? Import.CellWritableComparable.class : ImmutableBytesWritable.class;
592+
SequenceFile.Writer writer =
593+
SequenceFile.createWriter(fs, conf, partitionsPath, keyClass, NullWritable.class);
584594

585595
try {
586596
for (ImmutableBytesWritable startKey : sorted) {
587-
writer.append(startKey, NullWritable.get());
597+
Writable writable = diskBasedSortingEnabled
598+
? new Import.CellWritableComparable(KeyValueUtil.createFirstOnRow(startKey.get()))
599+
: startKey;
600+
601+
writer.append(writable, NullWritable.get());
588602
}
589603
} finally {
590604
writer.close();
@@ -631,6 +645,10 @@ public static void configureIncrementalLoad(Job job, TableDescriptor tableDescri
631645
configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
632646
}
633647

648+
public static boolean diskBasedSortingEnabled(Configuration conf) {
649+
return conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY, DISK_BASED_SORTING_ENABLED_DEFAULT);
650+
}
651+
634652
static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
635653
Class<? extends OutputFormat<?, ?>> cls) throws IOException {
636654
Configuration conf = job.getConfiguration();
@@ -652,7 +670,13 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
652670
// Based on the configured map output class, set the correct reducer to properly
653671
// sort the incoming values.
654672
// TODO it would be nice to pick one or the other of these formats.
655-
if (
673+
boolean diskBasedSorting = diskBasedSortingEnabled(conf);
674+
675+
if (diskBasedSorting) {
676+
job.setMapOutputKeyClass(Import.CellWritableComparable.class);
677+
job.setReducerClass(PreSortedCellsReducer.class);
678+
job.setSortComparatorClass(Import.CellWritableComparable.CellWritableComparator.class);
679+
} else if (
656680
KeyValue.class.equals(job.getMapOutputValueClass())
657681
|| MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
658682
) {

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ public CellWritableComparable(Cell kv) {
200200
this.kv = kv;
201201
}
202202

203+
public Cell getCell() {
204+
return kv;
205+
}
206+
203207
@Override
204208
public void write(DataOutput out) throws IOException {
205209
int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv);
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.mapreduce;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.hbase.Cell;
22+
import org.apache.hadoop.hbase.CellUtil;
23+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
24+
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
25+
import org.apache.hadoop.mapreduce.Reducer;
26+
import org.apache.yetus.audience.InterfaceAudience;
27+
28+
@InterfaceAudience.Private
29+
public class PreSortedCellsReducer
30+
extends Reducer<Import.CellWritableComparable, Cell, ImmutableBytesWritable, Cell> {
31+
32+
@Override
33+
protected void reduce(Import.CellWritableComparable key, Iterable<Cell> values, Context context)
34+
throws IOException, InterruptedException {
35+
36+
int index = 0;
37+
for (Cell cell : values) {
38+
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(key.getCell())),
39+
new MapReduceExtendedCell(cell));
40+
41+
if (++index % 100 == 0) {
42+
context.setStatus("Wrote " + index + " cells");
43+
}
44+
}
45+
}
46+
}

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
5555
import org.apache.hadoop.hbase.wal.WALEdit;
5656
import org.apache.hadoop.hbase.wal.WALKey;
57+
import org.apache.hadoop.io.WritableComparable;
5758
import org.apache.hadoop.mapreduce.Job;
5859
import org.apache.hadoop.mapreduce.Mapper;
5960
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -162,9 +163,10 @@ public void setup(Context context) throws IOException {
162163
/**
163164
* A mapper that just writes out Cells. This one can be used together with {@link CellSortReducer}
164165
*/
165-
static class WALCellMapper extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell> {
166+
static class WALCellMapper extends Mapper<WALKey, WALEdit, WritableComparable<?>, Cell> {
166167
private Set<String> tableSet = new HashSet<>();
167168
private boolean multiTableSupport = false;
169+
private boolean diskBasedSortingEnabled = false;
168170

169171
@Override
170172
public void map(WALKey key, WALEdit value, Context context) throws IOException {
@@ -185,7 +187,7 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
185187
byte[] outKey = multiTableSupport
186188
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
187189
: CellUtil.cloneRow(cell);
188-
context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell));
190+
context.write(wrapKey(outKey, cell), new MapReduceExtendedCell(cell));
189191
}
190192
}
191193
} catch (InterruptedException e) {
@@ -198,8 +200,22 @@ public void setup(Context context) throws IOException {
198200
Configuration conf = context.getConfiguration();
199201
String[] tables = conf.getStrings(TABLES_KEY);
200202
this.multiTableSupport = conf.getBoolean(MULTI_TABLES_SUPPORT, false);
203+
this.diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf);
201204
Collections.addAll(tableSet, tables);
202205
}
206+
207+
private WritableComparable<?> wrapKey(byte[] key, Cell cell) {
208+
if (this.diskBasedSortingEnabled) {
209+
// Important to build a new cell with the updated key to maintain multi-table support
210+
KeyValue kv = new KeyValue(key, 0, key.length, cell.getFamilyArray(),
211+
cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(),
212+
cell.getQualifierOffset(), cell.getQualifierLength(), cell.getTimestamp(),
213+
KeyValue.Type.codeToType(cell.getType().getCode()), null, 0, 0);
214+
return new Import.CellWritableComparable(kv);
215+
} else {
216+
return new ImmutableBytesWritable(key);
217+
}
218+
}
203219
}
204220

205221
/**
@@ -377,7 +393,13 @@ public Job createSubmittableJob(String[] args) throws IOException {
377393
job.setJarByClass(WALPlayer.class);
378394

379395
job.setInputFormatClass(WALInputFormat.class);
380-
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
396+
boolean diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf);
397+
if (diskBasedSortingEnabled) {
398+
job.setMapOutputKeyClass(Import.CellWritableComparable.class);
399+
job.setSortComparatorClass(Import.CellWritableComparable.CellWritableComparator.class);
400+
} else {
401+
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
402+
}
381403

382404
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
383405
if (hfileOutPath != null) {
@@ -396,7 +418,11 @@ public Job createSubmittableJob(String[] args) throws IOException {
396418
List<TableName> tableNames = getTableNameList(tables);
397419

398420
job.setMapperClass(WALCellMapper.class);
399-
job.setReducerClass(CellSortReducer.class);
421+
if (diskBasedSortingEnabled) {
422+
job.setReducerClass(PreSortedCellsReducer.class);
423+
} else {
424+
job.setReducerClass(CellSortReducer.class);
425+
}
400426
Path outputDir = new Path(hfileOutPath);
401427
FileOutputFormat.setOutputPath(job, outputDir);
402428
job.setMapOutputValueClass(MapReduceExtendedCell.class);

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedWALPlayer2.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.hadoop.hbase.wal.WAL;
5656
import org.apache.hadoop.hbase.wal.WALEdit;
5757
import org.apache.hadoop.hbase.wal.WALKey;
58+
import org.apache.hadoop.io.WritableComparable;
5859
import org.apache.hadoop.mapreduce.Mapper;
5960
import org.apache.hadoop.mapreduce.Mapper.Context;
6061
import org.apache.hadoop.util.ToolRunner;
@@ -172,7 +173,7 @@ private void testWALKeyValueMapper(final String tableConfigKey) throws Exception
172173
WALKey key = mock(WALKey.class);
173174
when(key.getTableName()).thenReturn(TableName.valueOf("table"));
174175
@SuppressWarnings("unchecked")
175-
Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell>.Context context = mock(Context.class);
176+
Mapper<WALKey, WALEdit, WritableComparable<?>, Cell>.Context context = mock(Context.class);
176177
when(context.getConfiguration()).thenReturn(configuration);
177178

178179
WALEdit value = mock(WALEdit.class);

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,50 @@ public static void afterClass() throws Exception {
114114
logFs.delete(walRootDir, true);
115115
}
116116

117+
@Test
118+
public void testDiskBasedSortingEnabled() throws Exception {
119+
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
120+
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
121+
final byte[] FAMILY = Bytes.toBytes("family");
122+
final byte[] COLUMN1 = Bytes.toBytes("c1");
123+
final byte[] COLUMN2 = Bytes.toBytes("c2");
124+
final byte[] ROW = Bytes.toBytes("row");
125+
Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
126+
Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
127+
128+
// put a row into the first table
129+
Put p = new Put(ROW);
130+
p.addColumn(FAMILY, COLUMN1, COLUMN1);
131+
p.addColumn(FAMILY, COLUMN2, COLUMN2);
132+
t1.put(p);
133+
// delete one column
134+
Delete d = new Delete(ROW);
135+
d.addColumns(FAMILY, COLUMN1);
136+
t1.delete(d);
137+
138+
// replay the WAL, map table 1 to table 2
139+
WAL log = cluster.getRegionServer(0).getWAL(null);
140+
log.rollWriter();
141+
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
142+
HConstants.HREGION_LOGDIR_NAME).toString();
143+
144+
Configuration configuration = TEST_UTIL.getConfiguration();
145+
configuration.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
146+
WALPlayer player = new WALPlayer(configuration);
147+
String optionName = "_test_.name";
148+
configuration.set(optionName, "1000");
149+
player.setupTime(configuration, optionName);
150+
assertEquals(1000, configuration.getLong(optionName, 0));
151+
assertEquals(0, ToolRunner.run(configuration, player,
152+
new String[] { walInputDir, tableName1.getNameAsString(), tableName2.getNameAsString() }));
153+
154+
// verify the WAL was player into table 2
155+
Get g = new Get(ROW);
156+
Result r = t2.get(g);
157+
assertEquals(1, r.size());
158+
assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
159+
}
160+
117161
/**
118162
* Test that WALPlayer can replay recovered.edits files.
119163
*/

0 commit comments

Comments
 (0)