Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
}
conf.set(JOB_NAME_CONF_KEY, jobname);
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);

BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
Expand All @@ -358,6 +359,7 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
+ " finished.");
} finally {
deleteBulkLoadDirectory();
conf.unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY);
}
}

Expand Down Expand Up @@ -411,6 +413,7 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true);
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
conf.set(JOB_NAME_CONF_KEY, jobname);

// Rack-aware WAL processing configuration is set directly via command line to the same key
Expand All @@ -423,6 +426,7 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
if (result != 0) {
throw new IOException("WAL Player failed");
}
conf.unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY);
conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
conf.unset(JOB_NAME_CONF_KEY);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
Expand All @@ -33,11 +34,14 @@
import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyOnlyCellComparable;
import org.apache.hadoop.hbase.mapreduce.PreSortedCellsReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
Expand Down Expand Up @@ -71,18 +75,28 @@ protected MapReduceHFileSplitterJob(final Configuration c) {
/**
* A mapper that just writes out cells. This one can be used together with {@link CellSortReducer}
*/
static class HFileCellMapper extends Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
static class HFileCellMapper extends Mapper<NullWritable, Cell, WritableComparable<?>, Cell> {

private boolean diskBasedSortingEnabled = false;

@Override
public void map(NullWritable key, Cell value, Context context)
throws IOException, InterruptedException {
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)),
new MapReduceExtendedCell(value));
ExtendedCell extendedCell = (ExtendedCell) value;
context.write(wrap(extendedCell), new MapReduceExtendedCell(extendedCell));
}

@Override
public void setup(Context context) throws IOException {
// do nothing
diskBasedSortingEnabled =
HFileOutputFormat2.diskBasedSortingEnabled(context.getConfiguration());
Comment on lines +91 to +92
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe paranoid, but it could be worth a debug log indicating the gate status here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll add it in the underlying method so we log everytime we poll the config.

}

private WritableComparable<?> wrap(ExtendedCell cell) {
if (diskBasedSortingEnabled) {
return new KeyOnlyCellComparable(cell);
}
return new ImmutableBytesWritable(CellUtil.cloneRow(cell));
}
}

Expand All @@ -109,14 +123,23 @@ public Job createSubmittableJob(String[] args) throws IOException {
// Use standard HFileInputFormat which now supports location resolver automatically
// HFileInputFormat will automatically detect and log rack-awareness configuration
job.setInputFormatClass(HFileInputFormat.class);

job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
boolean diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf);
if (diskBasedSortingEnabled) {
job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
} else {
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
}
if (hfileOutPath != null) {
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
TableName tableName = TableName.valueOf(tabName);
job.setMapperClass(HFileCellMapper.class);
job.setReducerClass(CellSortReducer.class);
if (diskBasedSortingEnabled) {
job.setReducerClass(PreSortedCellsReducer.class);
} else {
job.setReducerClass(CellSortReducer.class);
}
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(MapReduceExtendedCell.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -50,6 +49,7 @@
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
Expand Down Expand Up @@ -83,6 +83,7 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
Expand Down Expand Up @@ -194,6 +195,11 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
"hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;

@InterfaceAudience.Private
public static final String DISK_BASED_SORTING_ENABLED_KEY =
"hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled";
private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false;

public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
Expand Down Expand Up @@ -579,12 +585,19 @@ private static void writePartitions(Configuration conf, Path partitionsPath,

// Write the actual file
FileSystem fs = partitionsPath.getFileSystem(conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath,
ImmutableBytesWritable.class, NullWritable.class);
boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf);
Class<? extends Writable> keyClass =
diskBasedSortingEnabled ? KeyOnlyCellComparable.class : ImmutableBytesWritable.class;
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, conf, partitionsPath, keyClass, NullWritable.class);

try {
for (ImmutableBytesWritable startKey : sorted) {
writer.append(startKey, NullWritable.get());
Writable writable = diskBasedSortingEnabled
? new KeyOnlyCellComparable(KeyValueUtil.createFirstOnRow(startKey.get()))
: startKey;

writer.append(writable, NullWritable.get());
}
} finally {
writer.close();
Expand Down Expand Up @@ -631,6 +644,13 @@ public static void configureIncrementalLoad(Job job, TableDescriptor tableDescri
configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
}

public static boolean diskBasedSortingEnabled(Configuration conf) {
boolean res =
conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY, DISK_BASED_SORTING_ENABLED_DEFAULT);
LOG.info("Disk based sorting on: {}", res);
return res;
}

static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
Class<? extends OutputFormat<?, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration();
Expand All @@ -652,7 +672,13 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
// Based on the configured map output class, set the correct reducer to properly
// sort the incoming values.
// TODO it would be nice to pick one or the other of these formats.
if (
boolean diskBasedSorting = diskBasedSortingEnabled(conf);

if (diskBasedSorting) {
job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
job.setReducerClass(PreSortedCellsReducer.class);
} else if (
KeyValue.class.equals(job.getMapOutputValueClass())
|| MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ public CellWritableComparable(Cell kv) {
this.kv = kv;
}

public Cell getCell() {
return kv;
}

@Override
public void write(DataOutput out) throws IOException {
int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;

import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class KeyOnlyCellComparable implements WritableComparable<KeyOnlyCellComparable> {

static {
WritableComparator.define(KeyOnlyCellComparable.class, new KeyOnlyCellComparator());
}

private ExtendedCell cell = null;

public KeyOnlyCellComparable() {
}

public KeyOnlyCellComparable(ExtendedCell cell) {
this.cell = cell;
}

public ExtendedCell getCell() {
return cell;
}

@Override
public int compareTo(KeyOnlyCellComparable o) {
return CellComparator.getInstance().compare(cell, o.cell);
}

@Override
public void write(DataOutput out) throws IOException {
int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value.
out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
out.writeInt(keyLen);
out.writeInt(valueLen);
PrivateCellUtil.writeFlatKey(cell, out);
out.writeLong(cell.getSequenceId());
}

@Override
public void readFields(DataInput in) throws IOException {
cell = KeyValue.create(in);
long seqId = in.readLong();
cell.setSequenceId(seqId);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid creating a new class, and wanted to use the CellWritableComparable but it write/read the sequenceId which caused issues with sorting. So this entire class exists to write the cell coordinates + the sequenceId

}

public static class KeyOnlyCellComparator extends WritableComparator {

@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
KeyOnlyCellComparable kv1 = new KeyOnlyCellComparable();
kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
KeyOnlyCellComparable kv2 = new KeyOnlyCellComparable();
kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
return compare(kv1, kv2);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class PreSortedCellsReducer
extends Reducer<KeyOnlyCellComparable, Cell, ImmutableBytesWritable, Cell> {

@Override
protected void reduce(KeyOnlyCellComparable key, Iterable<Cell> values, Context context)
throws IOException, InterruptedException {

int index = 0;
for (Cell cell : values) {
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(key.getCell())),
new MapReduceExtendedCell(cell));

if (++index % 100 == 0) {
context.setStatus("Wrote " + index + " cells");
}
}
}
}
Loading