Skip to content

Commit a045e15

Browse files
authored
HBASE-28440: Add support for using mapreduce sort in HFileOutputFormat2 (not yet merged upstream) (#197)
1 parent 78bcd70 commit a045e15

File tree

10 files changed

+367
-62
lines changed

10 files changed

+367
-62
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
346346
LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
347347
}
348348
conf.set(JOB_NAME_CONF_KEY, jobname);
349+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
349350

350351
BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
351352
int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
@@ -358,6 +359,7 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
358359
+ " finished.");
359360
} finally {
360361
deleteBulkLoadDirectory();
362+
conf.unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY);
361363
}
362364
}
363365

@@ -411,6 +413,7 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
411413
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
412414
conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true);
413415
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
416+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
414417
conf.set(JOB_NAME_CONF_KEY, jobname);
415418

416419
// Rack-aware WAL processing configuration is set directly via command line to the same key
@@ -423,6 +426,7 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
423426
if (result != 0) {
424427
throw new IOException("WAL Player failed");
425428
}
429+
conf.unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY);
426430
conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
427431
conf.unset(JOB_NAME_CONF_KEY);
428432
} catch (IOException e) {

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.fs.Path;
2424
import org.apache.hadoop.hbase.Cell;
2525
import org.apache.hadoop.hbase.CellUtil;
26+
import org.apache.hadoop.hbase.ExtendedCell;
2627
import org.apache.hadoop.hbase.HBaseConfiguration;
2728
import org.apache.hadoop.hbase.TableName;
2829
import org.apache.hadoop.hbase.client.Connection;
@@ -33,11 +34,14 @@
3334
import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
3435
import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
3536
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
37+
import org.apache.hadoop.hbase.mapreduce.KeyOnlyCellComparable;
38+
import org.apache.hadoop.hbase.mapreduce.PreSortedCellsReducer;
3639
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
3740
import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
3841
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
3942
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
4043
import org.apache.hadoop.io.NullWritable;
44+
import org.apache.hadoop.io.WritableComparable;
4145
import org.apache.hadoop.mapreduce.Job;
4246
import org.apache.hadoop.mapreduce.Mapper;
4347
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -71,18 +75,28 @@ protected MapReduceHFileSplitterJob(final Configuration c) {
7175
/**
7276
* A mapper that just writes out cells. This one can be used together with {@link CellSortReducer}
7377
*/
74-
static class HFileCellMapper extends Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
78+
static class HFileCellMapper extends Mapper<NullWritable, Cell, WritableComparable<?>, Cell> {
79+
80+
private boolean diskBasedSortingEnabled = false;
7581

7682
@Override
7783
public void map(NullWritable key, Cell value, Context context)
7884
throws IOException, InterruptedException {
79-
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)),
80-
new MapReduceExtendedCell(value));
85+
ExtendedCell extendedCell = (ExtendedCell) value;
86+
context.write(wrap(extendedCell), new MapReduceExtendedCell(extendedCell));
8187
}
8288

8389
@Override
8490
public void setup(Context context) throws IOException {
85-
// do nothing
91+
diskBasedSortingEnabled =
92+
HFileOutputFormat2.diskBasedSortingEnabled(context.getConfiguration());
93+
}
94+
95+
private WritableComparable<?> wrap(ExtendedCell cell) {
96+
if (diskBasedSortingEnabled) {
97+
return new KeyOnlyCellComparable(cell);
98+
}
99+
return new ImmutableBytesWritable(CellUtil.cloneRow(cell));
86100
}
87101
}
88102

@@ -109,14 +123,23 @@ public Job createSubmittableJob(String[] args) throws IOException {
109123
// Use standard HFileInputFormat which now supports location resolver automatically
110124
// HFileInputFormat will automatically detect and log rack-awareness configuration
111125
job.setInputFormatClass(HFileInputFormat.class);
112-
113-
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
114126
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
127+
boolean diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf);
128+
if (diskBasedSortingEnabled) {
129+
job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
130+
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
131+
} else {
132+
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
133+
}
115134
if (hfileOutPath != null) {
116135
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
117136
TableName tableName = TableName.valueOf(tabName);
118137
job.setMapperClass(HFileCellMapper.class);
119-
job.setReducerClass(CellSortReducer.class);
138+
if (diskBasedSortingEnabled) {
139+
job.setReducerClass(PreSortedCellsReducer.class);
140+
} else {
141+
job.setReducerClass(CellSortReducer.class);
142+
}
120143
Path outputDir = new Path(hfileOutPath);
121144
FileOutputFormat.setOutputPath(job, outputDir);
122145
job.setMapOutputValueClass(MapReduceExtendedCell.class);

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
2222
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
2323
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
24-
2524
import java.io.IOException;
2625
import java.io.UnsupportedEncodingException;
2726
import java.net.InetSocketAddress;
@@ -50,6 +49,7 @@
5049
import org.apache.hadoop.hbase.HRegionLocation;
5150
import org.apache.hadoop.hbase.HTableDescriptor;
5251
import org.apache.hadoop.hbase.KeyValue;
52+
import org.apache.hadoop.hbase.KeyValueUtil;
5353
import org.apache.hadoop.hbase.PrivateCellUtil;
5454
import org.apache.hadoop.hbase.TableName;
5555
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -83,6 +83,7 @@
8383
import org.apache.hadoop.io.NullWritable;
8484
import org.apache.hadoop.io.SequenceFile;
8585
import org.apache.hadoop.io.Text;
86+
import org.apache.hadoop.io.Writable;
8687
import org.apache.hadoop.mapreduce.Job;
8788
import org.apache.hadoop.mapreduce.OutputCommitter;
8889
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -194,6 +195,11 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
194195
"hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
195196
static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
196197

198+
@InterfaceAudience.Private
199+
public static final String DISK_BASED_SORTING_ENABLED_KEY =
200+
"hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled";
201+
private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false;
202+
197203
public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
198204
public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
199205
REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
@@ -579,12 +585,19 @@ private static void writePartitions(Configuration conf, Path partitionsPath,
579585

580586
// Write the actual file
581587
FileSystem fs = partitionsPath.getFileSystem(conf);
582-
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath,
583-
ImmutableBytesWritable.class, NullWritable.class);
588+
boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf);
589+
Class<? extends Writable> keyClass =
590+
diskBasedSortingEnabled ? KeyOnlyCellComparable.class : ImmutableBytesWritable.class;
591+
SequenceFile.Writer writer =
592+
SequenceFile.createWriter(fs, conf, partitionsPath, keyClass, NullWritable.class);
584593

585594
try {
586595
for (ImmutableBytesWritable startKey : sorted) {
587-
writer.append(startKey, NullWritable.get());
596+
Writable writable = diskBasedSortingEnabled
597+
? new KeyOnlyCellComparable(KeyValueUtil.createFirstOnRow(startKey.get()))
598+
: startKey;
599+
600+
writer.append(writable, NullWritable.get());
588601
}
589602
} finally {
590603
writer.close();
@@ -631,6 +644,13 @@ public static void configureIncrementalLoad(Job job, TableDescriptor tableDescri
631644
configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
632645
}
633646

647+
public static boolean diskBasedSortingEnabled(Configuration conf) {
648+
boolean res =
649+
conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY, DISK_BASED_SORTING_ENABLED_DEFAULT);
650+
LOG.info("Disk based sorting on: {}", res);
651+
return res;
652+
}
653+
634654
static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
635655
Class<? extends OutputFormat<?, ?>> cls) throws IOException {
636656
Configuration conf = job.getConfiguration();
@@ -652,7 +672,13 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
652672
// Based on the configured map output class, set the correct reducer to properly
653673
// sort the incoming values.
654674
// TODO it would be nice to pick one or the other of these formats.
655-
if (
675+
boolean diskBasedSorting = diskBasedSortingEnabled(conf);
676+
677+
if (diskBasedSorting) {
678+
job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
679+
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
680+
job.setReducerClass(PreSortedCellsReducer.class);
681+
} else if (
656682
KeyValue.class.equals(job.getMapOutputValueClass())
657683
|| MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
658684
) {

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ public CellWritableComparable(Cell kv) {
200200
this.kv = kv;
201201
}
202202

203+
public Cell getCell() {
204+
return kv;
205+
}
206+
203207
@Override
204208
public void write(DataOutput out) throws IOException {
205209
int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv);
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.mapreduce;
19+
20+
import java.io.ByteArrayInputStream;
21+
import java.io.DataInput;
22+
import java.io.DataInputStream;
23+
import java.io.DataOutput;
24+
import java.io.IOException;
25+
import org.apache.hadoop.hbase.CellComparator;
26+
import org.apache.hadoop.hbase.ExtendedCell;
27+
import org.apache.hadoop.hbase.KeyValue;
28+
import org.apache.hadoop.hbase.PrivateCellUtil;
29+
import org.apache.hadoop.io.WritableComparable;
30+
import org.apache.hadoop.io.WritableComparator;
31+
import org.apache.yetus.audience.InterfaceAudience;
32+
33+
@InterfaceAudience.Private
34+
public class KeyOnlyCellComparable implements WritableComparable<KeyOnlyCellComparable> {
35+
36+
static {
37+
WritableComparator.define(KeyOnlyCellComparable.class, new KeyOnlyCellComparator());
38+
}
39+
40+
private ExtendedCell cell = null;
41+
42+
public KeyOnlyCellComparable() {
43+
}
44+
45+
public KeyOnlyCellComparable(ExtendedCell cell) {
46+
this.cell = cell;
47+
}
48+
49+
public ExtendedCell getCell() {
50+
return cell;
51+
}
52+
53+
@Override
54+
public int compareTo(KeyOnlyCellComparable o) {
55+
return CellComparator.getInstance().compare(cell, o.cell);
56+
}
57+
58+
@Override
59+
public void write(DataOutput out) throws IOException {
60+
int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
61+
int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value.
62+
out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
63+
out.writeInt(keyLen);
64+
out.writeInt(valueLen);
65+
PrivateCellUtil.writeFlatKey(cell, out);
66+
out.writeLong(cell.getSequenceId());
67+
}
68+
69+
@Override
70+
public void readFields(DataInput in) throws IOException {
71+
cell = KeyValue.create(in);
72+
long seqId = in.readLong();
73+
cell.setSequenceId(seqId);
74+
}
75+
76+
public static class KeyOnlyCellComparator extends WritableComparator {
77+
78+
@Override
79+
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
80+
try {
81+
KeyOnlyCellComparable kv1 = new KeyOnlyCellComparable();
82+
kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
83+
KeyOnlyCellComparable kv2 = new KeyOnlyCellComparable();
84+
kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
85+
return compare(kv1, kv2);
86+
} catch (IOException e) {
87+
throw new RuntimeException(e);
88+
}
89+
}
90+
}
91+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.mapreduce;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.hbase.Cell;
22+
import org.apache.hadoop.hbase.CellUtil;
23+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
24+
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
25+
import org.apache.hadoop.mapreduce.Reducer;
26+
import org.apache.yetus.audience.InterfaceAudience;
27+
28+
@InterfaceAudience.Private
29+
public class PreSortedCellsReducer
30+
extends Reducer<KeyOnlyCellComparable, Cell, ImmutableBytesWritable, Cell> {
31+
32+
@Override
33+
protected void reduce(KeyOnlyCellComparable key, Iterable<Cell> values, Context context)
34+
throws IOException, InterruptedException {
35+
36+
int index = 0;
37+
for (Cell cell : values) {
38+
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(key.getCell())),
39+
new MapReduceExtendedCell(cell));
40+
41+
if (++index % 100 == 0) {
42+
context.setStatus("Wrote " + index + " cells");
43+
}
44+
}
45+
}
46+
}

0 commit comments

Comments
 (0)