From a5aa9405d490b009635b8adb6350562ccfccca91 Mon Sep 17 00:00:00 2001 From: Hernan Romer Date: Tue, 1 Jul 2025 11:59:07 -0400 Subject: [PATCH 1/8] =?UTF-8?q?Revert=20"HubSpot=20Backport:=20HBASE-29386?= =?UTF-8?q?=20SnapshotProcedure=20and=20EnableTableProced=E2=80=A6"=20(#18?= =?UTF-8?q?7)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 72edb0c757d8f431d80843c3fa747e56d2e9dea5. --- .../master/procedure/SnapshotProcedure.java | 24 ++++++++ .../hbase/master/procedure/TableQueue.java | 2 +- .../TestSnapshotProcedureConcurrently.java | 57 ------------------- .../procedure/TestSnapshotProcedureRIT.java | 3 +- 4 files changed, 27 insertions(+), 59 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java index 572cdce0cf6f..a554f967ab9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java @@ -103,6 +103,30 @@ public TableOperationType getTableOperationType() { return TableOperationType.SNAPSHOT; } + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + // AbstractStateMachineTableProcedure acquires exclusive table lock by default, + // but we may need to downgrade it to shared lock for some reasons: + // a. exclusive lock has a negative effect on assigning region. See HBASE-21480 for details. + // b. we want to support taking multiple different snapshots on same table on the same time. + if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(MasterProcedureEnv env) { + env.getProcedureScheduler().wakeTableSharedLock(this, getTableName()); + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + // In order to avoid enabling/disabling/modifying/deleting table during snapshot, + // we don't release lock during suspend + return true; + } + @Override protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java index 8fd44079e11a..ec85cddafa0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java @@ -52,7 +52,6 @@ static boolean requireTableExclusiveLock(TableProcedureInterface proc) { case CREATE: case DELETE: case DISABLE: - case SNAPSHOT: case ENABLE: return true; case EDIT: @@ -60,6 +59,7 @@ static boolean requireTableExclusiveLock(TableProcedureInterface proc) { return !proc.getTableName().equals(TableName.NAMESPACE_TABLE_NAME); case READ: case FLUSH: + case SNAPSHOT: return false; // region operations are using the shared-lock on the table // and then they will grab an xlock on the region. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureConcurrently.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureConcurrently.java index 505bc4a49a9a..b4c62aa35dcd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureConcurrently.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureConcurrently.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.master.procedure; -import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.EnableTableState.ENABLE_TABLE_MARK_REGIONS_ONLINE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; @@ -28,7 +27,6 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.SnapshotType; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; @@ -138,59 +136,4 @@ public void run() { SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotOnSameTableProto, TABLE_NAME, CF); } - - @Test - public void testItFailsIfTableIsNotDisabledOrEnabled() throws Exception { - ProcedureExecutor executor = master.getMasterProcedureExecutor(); - MasterProcedureEnv env = executor.getEnvironment(); - TEST_UTIL.getAdmin().disableTable(TABLE_NAME); - - TestEnableTableProcedure enableTable = new TestEnableTableProcedure( - master.getMasterProcedureExecutor().getEnvironment(), TABLE_NAME); - long enableProcId = executor.submitProcedure(enableTable); - TEST_UTIL.waitFor(60000, () -> { - Procedure proc = executor.getProcedure(enableProcId); - if (proc == null) { - return false; - } - return ((TestEnableTableProcedure) proc).getProcedureState() - == ENABLE_TABLE_MARK_REGIONS_ONLINE; - }); - - // Using a delayed spy ensures we hit the problem state while the table enable procedure - // is waiting to run - SnapshotProcedure snapshotProc = new SnapshotProcedure(env, snapshotProto); - long snapshotProcId = executor.submitProcedure(snapshotProc); - TEST_UTIL.waitTableEnabled(TABLE_NAME); - // Wait for procedure to run and finish - TEST_UTIL.waitFor(60000, () -> executor.getProcedure(snapshotProcId) != null); - TEST_UTIL.waitFor(60000, () -> executor.getProcedure(snapshotProcId) == null); - - SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); - } - - // Needs to be publicly accessible for Procedure validation - public static class TestEnableTableProcedure extends EnableTableProcedure { - // Necessary for Procedure validation - public TestEnableTableProcedure() { - } - - public TestEnableTableProcedure(MasterProcedureEnv env, TableName tableName) { - super(env, tableName); - } - - public MasterProcedureProtos.EnableTableState getProcedureState() { - return getState(stateCount); - } - - @Override - protected Flow executeFromState(MasterProcedureEnv env, - MasterProcedureProtos.EnableTableState state) throws InterruptedException { - if (state == ENABLE_TABLE_MARK_REGIONS_ONLINE) { - Thread.sleep(10000); - } - - return super.executeFromState(env, state); - } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureRIT.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureRIT.java index e7cfeae324e7..60e96f7b80dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureRIT.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureRIT.java @@ -53,7 +53,8 @@ public void testTableInMergeWhileTakingSnapshot() throws Exception { () -> procExec.getProcedure(mergeProcId).getState() == ProcedureState.RUNNABLE); SnapshotProcedure sp = new SnapshotProcedure(procExec.getEnvironment(), snapshotProto); long snapshotProcId = procExec.submitProcedure(sp); - TEST_UTIL.waitFor(2000, 1000, () -> procExec.getProcedure(snapshotProcId) != null); + TEST_UTIL.waitFor(2000, 1000, () -> procExec.getProcedure(snapshotProcId) != null + && procExec.getProcedure(snapshotProcId).getState() == ProcedureState.WAITING_TIMEOUT); ProcedureTestingUtility.waitProcedure(procExec, snapshotProcId); SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); } From 7c2356df649f4fc64b094fe3714984ff576fd13a Mon Sep 17 00:00:00 2001 From: Hernan Romer Date: Mon, 21 Jul 2025 09:07:30 -0400 Subject: [PATCH 2/8] HubSpot Backport: HBASE-29447: Fix WAL archives cause incremental backup failures (will be in 2.6.4) Signed-off-by: Ray Mattingly Co-authored-by: Ray Mattingly Co-authored-by: Hernan Gelaf-Romer --- .../hbase/mapreduce/WALInputFormat.java | 34 ++++++++++-- .../hbase/mapreduce/TestWALInputFormat.java | 55 ++++++++++++++++++- 2 files changed, 82 insertions(+), 7 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index 7362f585d319..03d3250f54a9 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -318,7 +318,7 @@ List getSplits(final JobContext context, final String startKey, fina for (Path inputPath : inputPaths) { FileSystem fs = inputPath.getFileSystem(conf); try { - List files = getFiles(fs, inputPath, startTime, endTime); + List files = getFiles(fs, inputPath, startTime, endTime, conf); allFiles.addAll(files); } catch (FileNotFoundException e) { if (ignoreMissing) { @@ -349,11 +349,11 @@ private Path[] getInputPaths(Configuration conf) { * equal to this value else we will filter out the file. If name does not seem to * have a timestamp, we will just return it w/o filtering. */ - private List getFiles(FileSystem fs, Path dir, long startTime, long endTime) - throws IOException { + private List getFiles(FileSystem fs, Path dir, long startTime, long endTime, + Configuration conf) throws IOException { List result = new ArrayList<>(); LOG.debug("Scanning " + dir.toString() + " for WAL files"); - RemoteIterator iter = fs.listLocatedStatus(dir); + RemoteIterator iter = listLocatedFileStatus(fs, dir, conf); if (!iter.hasNext()) { return Collections.emptyList(); } @@ -361,7 +361,7 @@ private List getFiles(FileSystem fs, Path dir, long startTime, long LocatedFileStatus file = iter.next(); if (file.isDirectory()) { // Recurse into sub directories - result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); + result.addAll(getFiles(fs, file.getPath(), startTime, endTime, conf)); } else { addFile(result, file, startTime, endTime); } @@ -396,4 +396,28 @@ public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new WALKeyRecordReader(); } + + /** + * Attempts to return the {@link LocatedFileStatus} for the given directory. If the directory does + * not exist, it will check if the directory is an archived log file and try to find it + */ + private static RemoteIterator listLocatedFileStatus(FileSystem fs, Path dir, + Configuration conf) throws IOException { + try { + return fs.listLocatedStatus(dir); + } catch (FileNotFoundException e) { + if (AbstractFSWALProvider.isArchivedLogFile(dir)) { + throw e; + } + + LOG.warn("Log file {} not found, trying to find it in archive directory.", dir); + Path archiveFile = AbstractFSWALProvider.findArchivedLog(dir, conf); + if (archiveFile == null) { + LOG.error("Did not find archive file for {}", dir); + throw e; + } + + return fs.listLocatedStatus(archiveFile); + } + } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java index 70602a371668..6fdfb2bb8e2d 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java @@ -21,24 +21,43 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -@Category({ MapReduceTests.class, SmallTests.class }) +@Category({ MapReduceTests.class, MediumTests.class }) public class TestWALInputFormat { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestWALInputFormat.class); + @BeforeClass + public static void setupClass() throws Exception { + TEST_UTIL.startMiniCluster(); + TEST_UTIL.createWALRootDir(); + } + /** * Test the primitive start/end time filtering. */ @@ -74,4 +93,36 @@ public void testAddFile() { WALInputFormat.addFile(lfss, lfs, now, now); assertEquals(8, lfss.size()); } + + @Test + public void testHandlesArchivedWALFiles() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + JobContext ctx = Mockito.mock(JobContext.class); + Mockito.when(ctx.getConfiguration()).thenReturn(conf); + Job job = Job.getInstance(conf); + TableMapReduceUtil.initCredentialsForCluster(job, conf); + Mockito.when(ctx.getCredentials()).thenReturn(job.getCredentials()); + + // Setup WAL file, then archive it + HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(0); + AbstractFSWAL wal = (AbstractFSWAL) rs.getWALs().get(0); + Path walPath = wal.getCurrentFileName(); + TEST_UTIL.getConfiguration().set(FileInputFormat.INPUT_DIR, walPath.toString()); + TEST_UTIL.getConfiguration().set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); + + Path rootDir = CommonFSUtils.getWALRootDir(conf); + Path archiveWal = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + archiveWal = new Path(archiveWal, walPath.getName()); + TEST_UTIL.getTestFileSystem().delete(walPath, true); + TEST_UTIL.getTestFileSystem().mkdirs(archiveWal.getParent()); + TEST_UTIL.getTestFileSystem().create(archiveWal).close(); + + // Test for that we can read from the archived WAL file + WALInputFormat wif = new WALInputFormat(); + List splits = wif.getSplits(ctx); + assertEquals(1, splits.size()); + WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0); + assertEquals(archiveWal.toString(), split.getLogFileName()); + } + } From f4f990f55994739e0f7efc9d82235ffaf707e66a Mon Sep 17 00:00:00 2001 From: Sameer Dawani Date: Mon, 14 Jul 2025 13:13:32 -0400 Subject: [PATCH 3/8] HubSpot Backport: Rack aware incremental backup (not yet proposed upstream) --- .../hadoop/hbase/backup/BackupDriver.java | 6 ++ .../hbase/backup/BackupRestoreConstants.java | 8 ++ .../hbase/backup/impl/BackupCommands.java | 20 ++++ .../impl/IncrementalTableBackupClient.java | 3 + .../mapreduce/MapReduceHFileSplitterJob.java | 9 ++ .../hbase/mapreduce/HFileInputFormat.java | 91 +++++++++++++++++++ .../hbase/mapreduce/WALInputFormat.java | 45 ++++++++- .../hadoop/hbase/mapreduce/WALPlayer.java | 24 +++++ 8 files changed, 202 insertions(+), 4 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java index d55a280b4aa4..c135e7e0dff9 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_HFILE_LOCATION_RESOLVER; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_HFILE_LOCATION_RESOLVER_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_KEEP; @@ -35,6 +37,8 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_DESC; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WAL_LOCATION_RESOLVER; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WAL_LOCATION_RESOLVER_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME; @@ -159,6 +163,8 @@ protected void addOptions() { addOptWithArg(OPTION_PATH, OPTION_PATH_DESC); addOptWithArg(OPTION_KEEP, OPTION_KEEP_DESC); addOptWithArg(OPTION_YARN_QUEUE_NAME, OPTION_YARN_QUEUE_NAME_DESC); + addOptWithArg(OPTION_WAL_LOCATION_RESOLVER, OPTION_WAL_LOCATION_RESOLVER_DESC); + addOptWithArg(OPTION_HFILE_LOCATION_RESOLVER, OPTION_HFILE_LOCATION_RESOLVER_DESC); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java index 57454d402173..881e7ccf9353 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java @@ -99,6 +99,14 @@ public interface BackupRestoreConstants { String OPTION_YARN_QUEUE_NAME_DESC = "Yarn queue name to run backup create command on"; String OPTION_YARN_QUEUE_NAME_RESTORE_DESC = "Yarn queue name to run backup restore command on"; + String OPTION_WAL_LOCATION_RESOLVER = "wal-location-resolver"; + String OPTION_WAL_LOCATION_RESOLVER_DESC = + "WAL file location resolver class for rack-aware incremental backup"; + + String OPTION_HFILE_LOCATION_RESOLVER = "hfile-location-resolver"; + String OPTION_HFILE_LOCATION_RESOLVER_DESC = + "HFile location resolver class for rack-aware bulk loading during incremental backup"; + String JOB_NAME_CONF_KEY = "mapreduce.job.name"; String BACKUP_CONFIG_STRING = BackupRestoreConstants.BACKUP_ENABLE_KEY + "=true\n" diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index 66694f4384f4..2c78d0b50c11 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_HFILE_LOCATION_RESOLVER; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_KEEP; @@ -37,6 +38,7 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WAL_LOCATION_RESOLVER; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME; @@ -83,6 +85,14 @@ public final class BackupCommands { public final static String TOP_LEVEL_NOT_ALLOWED = "Top level (root) folder is not allowed to be a backup destination"; + // Configuration keys for location resolvers + // Must match WALPlayer.CONF_WAL_FILE_LOCATION_RESOLVER_CLASS + private static final String CONF_WAL_FILE_LOCATION_RESOLVER_CLASS = + "wal.backup.file.location.resolver.class"; + // Must match HFileInputFormat.CONF_HFILE_LOCATION_RESOLVER_CLASS + private static final String CONF_HFILE_LOCATION_RESOLVER_CLASS = + "hfile.backup.input.file.location.resolver.class"; + public static final String USAGE = "Usage: hbase backup COMMAND [command-specific arguments]\n" + "where COMMAND is one of:\n" + " create create a new backup image\n" + " delete delete an existing backup image\n" @@ -148,6 +158,16 @@ public void execute() throws IOException { getConf().set("mapreduce.job.queuename", queueName); } + if (cmdline.hasOption(OPTION_WAL_LOCATION_RESOLVER)) { + String resolverClass = cmdline.getOptionValue(OPTION_WAL_LOCATION_RESOLVER); + getConf().set(CONF_WAL_FILE_LOCATION_RESOLVER_CLASS, resolverClass); + } + + if (cmdline.hasOption(OPTION_HFILE_LOCATION_RESOLVER)) { + String resolverClass = cmdline.getOptionValue(OPTION_HFILE_LOCATION_RESOLVER); + getConf().set(CONF_HFILE_LOCATION_RESOLVER_CLASS, resolverClass); + } + // Create connection conn = ConnectionFactory.createConnection(getConf()); if (requiresNoActiveSession()) { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 1e07c026f0aa..14592806acec 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -412,6 +412,9 @@ protected void walToHFiles(List dirPaths, List tableList) throws conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true); conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); conf.set(JOB_NAME_CONF_KEY, jobname); + + // Rack-aware WAL processing configuration is set directly via command line to the same key + // WALPlayer uses String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; try { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java index 28db0c605f79..7d6ad00ddbf1 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -105,7 +105,11 @@ public Job createSubmittableJob(String[] args) throws IOException { job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, true); job.setJarByClass(MapReduceHFileSplitterJob.class); + + // Use standard HFileInputFormat which now supports location resolver automatically + // HFileInputFormat will automatically detect and log rack-awareness configuration job.setInputFormatClass(HFileInputFormat.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); if (hfileOutPath != null) { @@ -147,6 +151,11 @@ private void usage(final String errorMsg) { System.err.println("Other options:"); System.err.println(" -D " + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the HFile splitter"); + + System.err.println("Rack-aware processing option:"); + System.err.println(" -D" + HFileInputFormat.CONF_HFILE_LOCATION_RESOLVER_CLASS + "= - " + + "HFile location resolver class for rack-aware processing"); + System.err.println("For performance also consider the following options:\n" + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false"); } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java index 1bbbe513f738..709e9a394533 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java @@ -19,8 +19,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.OptionalLong; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -39,6 +42,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +56,36 @@ public class HFileInputFormat extends FileInputFormat { private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class); + // Configuration key for pluggable location resolver class name + // Used to enable rack-aware processing by providing preferred data locality hints + public static final String CONF_HFILE_LOCATION_RESOLVER_CLASS = + "hfile.backup.input.file.location.resolver.class"; + + /** + * Interface for resolving file locations to influence InputSplit placement for HFile processing. + */ + @InterfaceAudience.Public + public interface HFileLocationResolver { + /** + * Get preferred locations for a group of HFiles to optimize rack-aware processing. + * @param hfiles Collection of HFile paths that will be processed together + * @return Set of preferred host names for processing these HFiles + */ + Set getLocationsForInputFiles(final Collection hfiles); + } + + /** + * Default no-op implementation of HFileLocationResolver. Provides backward compatibility by + * returning no location hints. + */ + public static class NoopHFileLocationResolver implements HFileLocationResolver { + @Override + public Set getLocationsForInputFiles(Collection hfiles) { + // No location hints - lets YARN scheduler decide + return Collections.emptySet(); + } + } + /** * File filter that removes all "hidden" files. This might be something worth removing from a more * general purpose utility; it accounts for the presence of metadata files created in the way @@ -183,4 +217,61 @@ protected boolean isSplitable(JobContext context, Path filename) { // This file isn't splittable. return false; } + + @Override + public List getSplits(JobContext context) throws IOException { + Configuration conf = context.getConfiguration(); + + // Check if location resolver is configured + String resolverClass = conf.get(CONF_HFILE_LOCATION_RESOLVER_CLASS); + if (resolverClass == null) { + LOG.debug("HFile rack-aware processing disabled - no location resolver configured"); + return super.getSplits(context); + } + + LOG.info("HFile rack-aware processing enabled with location resolver: {}", resolverClass); + try { + return createLocationAwareSplits(context, conf); + } catch (Exception e) { + LOG.error("Error creating location-aware splits, falling back to standard splits", e); + return super.getSplits(context); + } + } + + private List createLocationAwareSplits(JobContext context, Configuration conf) + throws IOException { + // Get all HFiles from input paths using existing listStatus logic + List files = listStatus(context); + + // Load configured location resolver + Class resolverClass = + conf.getClass(CONF_HFILE_LOCATION_RESOLVER_CLASS, NoopHFileLocationResolver.class, + HFileLocationResolver.class); + HFileLocationResolver fileLocationResolver = ReflectionUtils.newInstance(resolverClass, conf); + + // Create InputSplits with location hints + List splits = new ArrayList<>(); + + for (FileStatus file : files) { + Path path = file.getPath(); + long length = file.getLen(); + + if (length <= 0) { + LOG.warn("Skipping empty or invalid HFile: {} with length: {}", path, length); + continue; + } + + // Get location hints for this file + Set locations = + fileLocationResolver.getLocationsForInputFiles(Collections.singletonList(path.toString())); + String[] locationArray = locations.toArray(new String[0]); + + splits.add(new FileSplit(path, 0, length, locationArray)); + } + + LOG.info("Created {} location-aware InputSplits from {} HFiles", splits.size(), files.size()); + + return splits; + } + } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index 03d3250f54a9..c4f3187b04dc 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -71,6 +71,7 @@ static class WALSplit extends InputSplit implements Writable { private long fileSize; private long startTime; private long endTime; + private String[] locations; /** for serialization */ public WALSplit() { @@ -85,6 +86,19 @@ public WALSplit(String logFileName, long fileSize, long startTime, long endTime) this.fileSize = fileSize; this.startTime = startTime; this.endTime = endTime; + this.locations = new String[] {}; + } + + /** + * Represent an WALSplit with location hints for rack-aware processing. + */ + public WALSplit(String logFileName, long fileSize, long startTime, long endTime, + String[] locations) { + this.logFileName = logFileName; + this.fileSize = fileSize; + this.startTime = startTime; + this.endTime = endTime; + this.locations = locations != null ? locations : new String[] {}; } @Override @@ -94,8 +108,7 @@ public long getLength() throws IOException, InterruptedException { @Override public String[] getLocations() throws IOException, InterruptedException { - // TODO: Find the data node with the most blocks for this WAL? - return new String[] {}; + return locations; } public String getLogFileName() { @@ -116,6 +129,11 @@ public void readFields(DataInput in) throws IOException { fileSize = in.readLong(); startTime = in.readLong(); endTime = in.readLong(); + int locationsCount = in.readInt(); + locations = new String[locationsCount]; + for (int i = 0; i < locationsCount; i++) { + locations[i] = in.readUTF(); + } } @Override @@ -124,6 +142,10 @@ public void write(DataOutput out) throws IOException { out.writeLong(fileSize); out.writeLong(startTime); out.writeLong(endTime); + out.writeInt(locations.length); + for (String location : locations) { + out.writeUTF(location); + } } @Override @@ -328,10 +350,25 @@ List getSplits(final JobContext context, final String startKey, fina throw e; } } - List splits = new ArrayList(allFiles.size()); + // Create InputSplits with location hints for each WAL file + Class locationResolverClass = + conf.getClass(WALPlayer.CONF_WAL_FILE_LOCATION_RESOLVER_CLASS, + WALPlayer.NoopWALFileLocationResolver.class, WALPlayer.WALFileLocationResolver.class); + + WALPlayer.WALFileLocationResolver locationResolver = + org.apache.hadoop.util.ReflectionUtils.newInstance(locationResolverClass, conf); + + List splits = new ArrayList<>(); for (FileStatus file : allFiles) { - splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); + // Get locations for this specific WAL file + String[] locations = locationResolver + .getLocationsForWALFiles(Collections.singletonList(file.getPath().toString())) + .toArray(new String[0]); + + splits + .add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime, locations)); } + return splits; } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index e06300848f68..189727f81b0f 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -21,6 +21,7 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -63,6 +64,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; + /** * A tool to replay WAL files as a M/R job. The WAL can be replayed for a set of tables or all * tables, and a time range can be provided (in milliseconds). The WAL is filtered to the passed set @@ -85,6 +88,27 @@ public class WALPlayer extends Configured implements Tool { private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + // Configuration key for pluggable WAL location resolver class name + // Used to enable rack-aware processing by providing preferred data locality hints for WAL files + public static final String CONF_WAL_FILE_LOCATION_RESOLVER_CLASS = + "wal.backup.file.location.resolver.class"; + + /** + * Interface for resolving file locations to influence InputSplit placement for rack-aware WAL + * processing. + */ + @InterfaceAudience.Public + public interface WALFileLocationResolver { + Set getLocationsForWALFiles(final Collection walFiles); + } + + public static class NoopWALFileLocationResolver implements WALFileLocationResolver { + @Override + public Set getLocationsForWALFiles(Collection walFiles) { + return ImmutableSet.of(); + } + } + public WALPlayer() { } From a2f7b66c9a9fe2f688f37eab7a2776b18e7a9533 Mon Sep 17 00:00:00 2001 From: ritika03494 <153202689+ritika03494@users.noreply.github.com> Date: Wed, 30 Jul 2025 16:01:47 +0100 Subject: [PATCH 4/8] HubSpot Edit: Use branched buildpack --- hubspot-client-bundles/.blazar.yaml | 1 + hubspot-client-bundles/hbase-backup-restore-bundle/.blazar.yaml | 2 +- hubspot-client-bundles/hbase-client-bundle/.blazar.yaml | 1 + hubspot-client-bundles/hbase-mapreduce-bundle/.blazar.yaml | 1 + hubspot-client-bundles/hbase-server-it-bundle/.blazar.yaml | 2 +- 5 files changed, 5 insertions(+), 2 deletions(-) diff --git a/hubspot-client-bundles/.blazar.yaml b/hubspot-client-bundles/.blazar.yaml index a57d5eeb071b..8d5dac3de18f 100644 --- a/hubspot-client-bundles/.blazar.yaml +++ b/hubspot-client-bundles/.blazar.yaml @@ -1,5 +1,6 @@ buildpack: name: Blazar-Buildpack-Java + branch: rm-test-hbase env: # Below variables are generated in prepare_environment.sh. diff --git a/hubspot-client-bundles/hbase-backup-restore-bundle/.blazar.yaml b/hubspot-client-bundles/hbase-backup-restore-bundle/.blazar.yaml index 9399e5dc0aa4..509a0cc10fab 100644 --- a/hubspot-client-bundles/hbase-backup-restore-bundle/.blazar.yaml +++ b/hubspot-client-bundles/hbase-backup-restore-bundle/.blazar.yaml @@ -1,6 +1,6 @@ buildpack: name: Blazar-Buildpack-Java - + branch: rm-test-hbase env: # Below variables are generated in prepare_environment.sh. # The build environment requires environment variables to be explicitly defined before they may diff --git a/hubspot-client-bundles/hbase-client-bundle/.blazar.yaml b/hubspot-client-bundles/hbase-client-bundle/.blazar.yaml index 300be28892e8..aba96b1c7dd9 100644 --- a/hubspot-client-bundles/hbase-client-bundle/.blazar.yaml +++ b/hubspot-client-bundles/hbase-client-bundle/.blazar.yaml @@ -1,5 +1,6 @@ buildpack: name: Blazar-Buildpack-Java + branch: rm-test-hbase env: # Below variables are generated in prepare_environment.sh. diff --git a/hubspot-client-bundles/hbase-mapreduce-bundle/.blazar.yaml b/hubspot-client-bundles/hbase-mapreduce-bundle/.blazar.yaml index 5c020e374927..c79dbaaf6044 100644 --- a/hubspot-client-bundles/hbase-mapreduce-bundle/.blazar.yaml +++ b/hubspot-client-bundles/hbase-mapreduce-bundle/.blazar.yaml @@ -1,5 +1,6 @@ buildpack: name: Blazar-Buildpack-Java + branch: rm-test-hbase env: # Below variables are generated in prepare_environment.sh. diff --git a/hubspot-client-bundles/hbase-server-it-bundle/.blazar.yaml b/hubspot-client-bundles/hbase-server-it-bundle/.blazar.yaml index 26db8c8066b3..03fe644bf878 100644 --- a/hubspot-client-bundles/hbase-server-it-bundle/.blazar.yaml +++ b/hubspot-client-bundles/hbase-server-it-bundle/.blazar.yaml @@ -1,6 +1,6 @@ buildpack: name: Blazar-Buildpack-Java - + branch: rm-test-hbase env: # Below variables are generated in prepare_environment.sh. # The build environment requires environment variables to be explicitly defined before they may From 85a319dab653563da7f3a37157fb030f14568b4f Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Thu, 7 Aug 2025 14:58:29 -0400 Subject: [PATCH 5/8] HBASE-29502: Skip meta cache in RegionReplicaReplicationEndpoint when only one replica found --- .../RegionReplicaReplicationEndpoint.java | 9 +- .../TestRegionReplicaReplicationEndpoint.java | 100 ++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index 754811ce0e04..94b9daf836f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -439,7 +439,7 @@ public void append(TableName tableName, byte[] encodedRegionName, byte[] row, // Replicas can take a while to come online. The cache may have only the primary. If we // keep going to the cache, we will not learn of the replicas and their locations after // they come online. - if (useCache && locations.size() == 1 && TableName.isMetaTableName(tableName)) { + if (useCache && locations.size() == 1) { if (tableDescriptors.get(tableName).getRegionReplication() > 1) { // Make an obnoxious log here. See how bad this issue is. Add a timer if happening // too much. @@ -488,6 +488,13 @@ public void append(TableName tableName, byte[] encodedRegionName, byte[] row, } if (locations.size() == 1) { + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping {} entries in table {} because only one region location was found", + entries.size(), tableName); + for (Entry entry : entries) { + LOG.trace("Skipping: {}", entry); + } + } return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index 9a03536f7541..ffca0caabef6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -507,6 +507,106 @@ public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disab } } + @Test + public void testMetaCacheMissTriggersRefresh() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + int regionReplication = 3; + HTableDescriptor htd = HTU.createTableDescriptor(tableName); + htd.setRegionReplication(regionReplication); + createOrEnableTableWithRetries(htd, true); + + Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table table = connection.getTable(tableName); + + try { + HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 100); + + RegionLocator rl = connection.getRegionLocator(tableName); + HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); + byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); + rl.close(); + + AtomicLong skippedEdits = new AtomicLong(); + RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = + mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); + when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); + + FSTableDescriptors fstd = + new FSTableDescriptors(FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath()); + + RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = + new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, + (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE, + fstd); + + Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(Bytes.toBytes("testRow")).setFamily(HBaseTestingUtility.fam1) + .setValue(Bytes.toBytes("testValue")).setType(Type.Put).build(); + + Entry entry = + new Entry(new WALKeyImpl(encodedRegionName, tableName, 1), new WALEdit().add(cell)); + + sinkWriter.append(tableName, encodedRegionName, Bytes.toBytes("testRow"), + Lists.newArrayList(entry)); + + assertEquals("No entries should be skipped for valid table", 0, skippedEdits.get()); + + } finally { + table.close(); + connection.close(); + } + } + + @Test + public void testMetaCacheSkippedForSingleReplicaTable() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + int regionReplication = 1; + HTableDescriptor htd = HTU.createTableDescriptor(tableName); + htd.setRegionReplication(regionReplication); + createOrEnableTableWithRetries(htd, true); + + Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table table = connection.getTable(tableName); + + try { + HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 100); + + RegionLocator rl = connection.getRegionLocator(tableName); + HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); + byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); + rl.close(); + + AtomicLong skippedEdits = new AtomicLong(); + RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = + mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); + when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); + + FSTableDescriptors fstd = + new FSTableDescriptors(FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath()); + + RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = + new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, + (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE, + fstd); + + Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(Bytes.toBytes("testRow")).setFamily(HBaseTestingUtility.fam1) + .setValue(Bytes.toBytes("testValue")).setType(Type.Put).build(); + + Entry entry = + new Entry(new WALKeyImpl(encodedRegionName, tableName, 1), new WALEdit().add(cell)); + + sinkWriter.append(tableName, encodedRegionName, Bytes.toBytes("testRow"), + Lists.newArrayList(entry)); + + assertEquals("No entries should be skipped for single replica table", 0, skippedEdits.get()); + + } finally { + table.close(); + connection.close(); + } + } + private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) { // Helper function to run create/enable table operations with a retry feature boolean continueToRetry = true; From a022e110489634c82063481ca560057986e6a55c Mon Sep 17 00:00:00 2001 From: Hernan Romer Date: Fri, 15 Aug 2025 12:21:53 -0400 Subject: [PATCH 6/8] HBASE-29386 SnapshotProcedure and EnableTableProcedure can cause a deadlock (#7084) (#7122) (#195) Signed-off-by: Duo Zhang Signed-off-by: Ray Mattingly Signed-off-by: Wellington Ramos Chevreuil Co-authored-by: Ray Mattingly Co-authored-by: Hernan Gelaf-Romer --- .../master/procedure/SnapshotProcedure.java | 24 -------- .../hbase/master/procedure/TableQueue.java | 2 +- .../TestSnapshotProcedureConcurrently.java | 57 +++++++++++++++++++ .../procedure/TestSnapshotProcedureRIT.java | 3 +- 4 files changed, 59 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java index a554f967ab9a..572cdce0cf6f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java @@ -103,30 +103,6 @@ public TableOperationType getTableOperationType() { return TableOperationType.SNAPSHOT; } - @Override - protected LockState acquireLock(MasterProcedureEnv env) { - // AbstractStateMachineTableProcedure acquires exclusive table lock by default, - // but we may need to downgrade it to shared lock for some reasons: - // a. exclusive lock has a negative effect on assigning region. See HBASE-21480 for details. - // b. we want to support taking multiple different snapshots on same table on the same time. - if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) { - return LockState.LOCK_EVENT_WAIT; - } - return LockState.LOCK_ACQUIRED; - } - - @Override - protected void releaseLock(MasterProcedureEnv env) { - env.getProcedureScheduler().wakeTableSharedLock(this, getTableName()); - } - - @Override - protected boolean holdLock(MasterProcedureEnv env) { - // In order to avoid enabling/disabling/modifying/deleting table during snapshot, - // we don't release lock during suspend - return true; - } - @Override protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java index ec85cddafa0e..8fd44079e11a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java @@ -52,6 +52,7 @@ static boolean requireTableExclusiveLock(TableProcedureInterface proc) { case CREATE: case DELETE: case DISABLE: + case SNAPSHOT: case ENABLE: return true; case EDIT: @@ -59,7 +60,6 @@ static boolean requireTableExclusiveLock(TableProcedureInterface proc) { return !proc.getTableName().equals(TableName.NAMESPACE_TABLE_NAME); case READ: case FLUSH: - case SNAPSHOT: return false; // region operations are using the shared-lock on the table // and then they will grab an xlock on the region. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureConcurrently.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureConcurrently.java index b4c62aa35dcd..505bc4a49a9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureConcurrently.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureConcurrently.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master.procedure; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.EnableTableState.ENABLE_TABLE_MARK_REGIONS_ONLINE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; @@ -27,6 +28,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.SnapshotType; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; @@ -136,4 +138,59 @@ public void run() { SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotOnSameTableProto, TABLE_NAME, CF); } + + @Test + public void testItFailsIfTableIsNotDisabledOrEnabled() throws Exception { + ProcedureExecutor executor = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = executor.getEnvironment(); + TEST_UTIL.getAdmin().disableTable(TABLE_NAME); + + TestEnableTableProcedure enableTable = new TestEnableTableProcedure( + master.getMasterProcedureExecutor().getEnvironment(), TABLE_NAME); + long enableProcId = executor.submitProcedure(enableTable); + TEST_UTIL.waitFor(60000, () -> { + Procedure proc = executor.getProcedure(enableProcId); + if (proc == null) { + return false; + } + return ((TestEnableTableProcedure) proc).getProcedureState() + == ENABLE_TABLE_MARK_REGIONS_ONLINE; + }); + + // Using a delayed spy ensures we hit the problem state while the table enable procedure + // is waiting to run + SnapshotProcedure snapshotProc = new SnapshotProcedure(env, snapshotProto); + long snapshotProcId = executor.submitProcedure(snapshotProc); + TEST_UTIL.waitTableEnabled(TABLE_NAME); + // Wait for procedure to run and finish + TEST_UTIL.waitFor(60000, () -> executor.getProcedure(snapshotProcId) != null); + TEST_UTIL.waitFor(60000, () -> executor.getProcedure(snapshotProcId) == null); + + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + } + + // Needs to be publicly accessible for Procedure validation + public static class TestEnableTableProcedure extends EnableTableProcedure { + // Necessary for Procedure validation + public TestEnableTableProcedure() { + } + + public TestEnableTableProcedure(MasterProcedureEnv env, TableName tableName) { + super(env, tableName); + } + + public MasterProcedureProtos.EnableTableState getProcedureState() { + return getState(stateCount); + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, + MasterProcedureProtos.EnableTableState state) throws InterruptedException { + if (state == ENABLE_TABLE_MARK_REGIONS_ONLINE) { + Thread.sleep(10000); + } + + return super.executeFromState(env, state); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureRIT.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureRIT.java index 60e96f7b80dc..e7cfeae324e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureRIT.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedureRIT.java @@ -53,8 +53,7 @@ public void testTableInMergeWhileTakingSnapshot() throws Exception { () -> procExec.getProcedure(mergeProcId).getState() == ProcedureState.RUNNABLE); SnapshotProcedure sp = new SnapshotProcedure(procExec.getEnvironment(), snapshotProto); long snapshotProcId = procExec.submitProcedure(sp); - TEST_UTIL.waitFor(2000, 1000, () -> procExec.getProcedure(snapshotProcId) != null - && procExec.getProcedure(snapshotProcId).getState() == ProcedureState.WAITING_TIMEOUT); + TEST_UTIL.waitFor(2000, 1000, () -> procExec.getProcedure(snapshotProcId) != null); ProcedureTestingUtility.waitProcedure(procExec, snapshotProcId); SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); } From 66906449789eb7b2bfb7e6abe27fd9ec3697805e Mon Sep 17 00:00:00 2001 From: Siddharth Khillon Date: Wed, 20 Aug 2025 14:24:50 -0700 Subject: [PATCH 7/8] =?UTF-8?q?HubSpot=20Backport:=20HBASE-29469=20Add=20m?= =?UTF-8?q?etrics=20with=20more=20detail=20for=20RpcThrottlingExceptions?= =?UTF-8?q?=20=E2=80=A6=20(#196)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * HBASE-29469 Add metrics with more detail for RpcThrottlingExceptions (#7214) Co-authored-by: skhillon Signed-off by: cconnell Reviewed by: kgeisz * Removing unnecessary sanitization * Remove unnecessary tests --------- Co-authored-by: skhillon --- .../quotas/RegionServerRpcQuotaManager.java | 8 + .../regionserver/MetricsRegionServer.java | 16 ++ .../metrics/MetricsThrottleExceptions.java | 71 +++++ .../regionserver/TestMetricsRegionServer.java | 29 ++ .../TestMetricsThrottleExceptions.java | 251 ++++++++++++++++++ 5 files changed, 375 insertions(+) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/MetricsThrottleExceptions.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestMetricsThrottleExceptions.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 03fbfde47a13..958793dcdf00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -196,6 +196,10 @@ public OperationQuota checkScanQuota(final Region region, } catch (RpcThrottlingException e) { LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " scan=" + scanRequest.getScannerId() + ": " + e.getMessage()); + + rsServices.getMetrics().recordThrottleException(e.getType(), ugi.getShortUserName(), + table.getNameAsString()); + throw e; } return quota; @@ -269,6 +273,10 @@ public OperationQuota checkBatchQuota(final Region region, final int numWrites, } catch (RpcThrottlingException e) { LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage()); + + rsServices.getMetrics().recordThrottleException(e.getType(), ugi.getShortUserName(), + table.getNameAsString()); + throw e; } return quota; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index a0bf25dc2eaa..580f77874992 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hbase.metrics.MetricRegistries; import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.metrics.Timer; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; +import org.apache.hadoop.hbase.regionserver.metrics.MetricsThrottleExceptions; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -47,6 +49,7 @@ public class MetricsRegionServer { private MetricsRegionServerQuotaSource quotaSource; private MetricRegistry metricRegistry; + private MetricsThrottleExceptions throttleMetrics; private Timer bulkLoadTimer; // Incremented once for each call to Scan#nextRaw private Meter serverReadQueryMeter; @@ -78,6 +81,8 @@ public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, Confi serverReadQueryMeter = metricRegistry.meter("ServerReadQueryPerSecond"); serverWriteQueryMeter = metricRegistry.meter("ServerWriteQueryPerSecond"); } + + throttleMetrics = new MetricsThrottleExceptions(metricRegistry); } MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, @@ -296,4 +301,15 @@ public void incrScannerLeaseExpired() { serverSource.incrScannerLeaseExpired(); } + /** + * Record a throttle exception with contextual information. + * @param throttleType the type of throttle exception from RpcThrottlingException.Type enum + * @param user the user who triggered the throttle + * @param table the table that was being accessed + */ + public void recordThrottleException(RpcThrottlingException.Type throttleType, String user, + String table) { + throttleMetrics.recordThrottleException(throttleType, user, table); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/MetricsThrottleExceptions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/MetricsThrottleExceptions.java new file mode 100644 index 000000000000..90480c75cbcc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/MetricsThrottleExceptions.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.metrics; + +import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class MetricsThrottleExceptions { + + /** + * The name of the metrics + */ + private static final String METRICS_NAME = "ThrottleExceptions"; + + /** + * The name of the metrics context that metrics will be under. + */ + private static final String METRICS_CONTEXT = "regionserver"; + + /** + * Description + */ + private static final String METRICS_DESCRIPTION = "Metrics about RPC throttling exceptions"; + + /** + * The name of the metrics context that metrics will be under in jmx + */ + private static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; + + private final MetricRegistry registry; + + public MetricsThrottleExceptions(MetricRegistry sharedRegistry) { + registry = sharedRegistry; + } + + /** + * Record a throttle exception with contextual information. + * @param throttleType the type of throttle exception + * @param user the user who triggered the throttle + * @param table the table that was being accessed + */ + public void recordThrottleException(RpcThrottlingException.Type throttleType, String user, + String table) { + String metricName = qualifyThrottleMetric(throttleType, user, table); + registry.counter(metricName).increment(); + } + + private static String qualifyThrottleMetric(RpcThrottlingException.Type throttleType, String user, + String table) { + return String.format("RpcThrottlingException_Type_%s_User_%s_Table_%s", throttleType.name(), + user, table); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java index 2186aa7cc4dc..76378f784d49 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java @@ -26,11 +26,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompatibilityFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.metrics.MetricRegistries; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; import org.apache.hadoop.hbase.regionserver.metrics.MetricsTableRequests; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.JvmPauseMonitor; +import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -68,6 +71,12 @@ public void setUp() { serverSource = rsm.getMetricsSource(); } + @After + public void tearDown() { + // Clean up global registries after each test to avoid interference + MetricRegistries.global().clear(); + } + @Test public void testWrapperSource() { HELPER.assertTag("serverName", "test", serverSource); @@ -314,4 +323,24 @@ public void testScannerMetrics() { HELPER.assertGauge("activeScanners", 0, serverSource); } + @Test + public void testThrottleExceptionMetricsIntegration() { + // Record different types of throttle exceptions + rsm.recordThrottleException(RpcThrottlingException.Type.NumRequestsExceeded, "alice", "users"); + rsm.recordThrottleException(RpcThrottlingException.Type.WriteSizeExceeded, "bob", "logs"); + rsm.recordThrottleException(RpcThrottlingException.Type.ReadSizeExceeded, "charlie", + "metadata"); + + // Record the same exception multiple times to test increment + rsm.recordThrottleException(RpcThrottlingException.Type.NumRequestsExceeded, "alice", "users"); + rsm.recordThrottleException(RpcThrottlingException.Type.NumRequestsExceeded, "alice", "users"); + + // Verify the specific counters were created and have correct values using HELPER + HELPER.assertCounter("RpcThrottlingException_Type_NumRequestsExceeded_User_alice_Table_users", + 3L, serverSource); + HELPER.assertCounter("RpcThrottlingException_Type_WriteSizeExceeded_User_bob_Table_logs", 1L, + serverSource); + HELPER.assertCounter("RpcThrottlingException_Type_ReadSizeExceeded_User_charlie_Table_metadata", + 1L, serverSource); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestMetricsThrottleExceptions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestMetricsThrottleExceptions.java new file mode 100644 index 000000000000..4f627dc507da --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestMetricsThrottleExceptions.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.metrics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.metrics.Counter; +import org.apache.hadoop.hbase.metrics.Metric; +import org.apache.hadoop.hbase.metrics.MetricRegistries; +import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestMetricsThrottleExceptions { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMetricsThrottleExceptions.class); + + private MetricRegistry testRegistry; + private MetricsThrottleExceptions throttleMetrics; + + @After + public void cleanup() { + // Clean up global registries after each test to avoid interference + MetricRegistries.global().clear(); + } + + @Test + public void testBasicThrottleMetricsRecording() { + setupTestMetrics(); + + // Record a throttle exception + throttleMetrics.recordThrottleException(RpcThrottlingException.Type.NumRequestsExceeded, + "alice", "users"); + + // Verify the counter exists and has correct value + Optional metric = + testRegistry.get("RpcThrottlingException_Type_NumRequestsExceeded_User_alice_Table_users"); + assertTrue("Counter metric should be present", metric.isPresent()); + assertTrue("Metric should be a counter", metric.get() instanceof Counter); + + Counter counter = (Counter) metric.get(); + assertEquals("Counter should have count of 1", 1, counter.getCount()); + } + + @Test + public void testMultipleThrottleTypes() { + setupTestMetrics(); + + // Record different types of throttle exceptions + throttleMetrics.recordThrottleException(RpcThrottlingException.Type.NumRequestsExceeded, + "alice", "users"); + throttleMetrics.recordThrottleException(RpcThrottlingException.Type.WriteSizeExceeded, "bob", + "logs"); + throttleMetrics.recordThrottleException(RpcThrottlingException.Type.ReadSizeExceeded, "charlie", + "metadata"); + + // Verify all three counters were created + verifyCounter(testRegistry, + "RpcThrottlingException_Type_NumRequestsExceeded_User_alice_Table_users", 1); + verifyCounter(testRegistry, "RpcThrottlingException_Type_WriteSizeExceeded_User_bob_Table_logs", + 1); + verifyCounter(testRegistry, + "RpcThrottlingException_Type_ReadSizeExceeded_User_charlie_Table_metadata", 1); + } + + @Test + public void testCounterIncrement() { + setupTestMetrics(); + + // Record the same throttle exception multiple times + String metricName = "RpcThrottlingException_Type_NumRequestsExceeded_User_alice_Table_users"; + throttleMetrics.recordThrottleException(RpcThrottlingException.Type.NumRequestsExceeded, + "alice", "users"); + throttleMetrics.recordThrottleException(RpcThrottlingException.Type.NumRequestsExceeded, + "alice", "users"); + throttleMetrics.recordThrottleException(RpcThrottlingException.Type.NumRequestsExceeded, + "alice", "users"); + + // Verify the counter incremented correctly + verifyCounter(testRegistry, metricName, 3); + } + + @Test + public void testConcurrentAccess() throws InterruptedException { + setupTestMetrics(); + + int numThreads = 10; + int incrementsPerThread = 100; + + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numThreads); + AtomicInteger exceptions = new AtomicInteger(0); + + // Create multiple threads that increment the same counter concurrently + for (int i = 0; i < numThreads; i++) { + executor.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < incrementsPerThread; j++) { + throttleMetrics.recordThrottleException(RpcThrottlingException.Type.NumRequestsExceeded, + "alice", "users"); + } + } catch (Exception e) { + exceptions.incrementAndGet(); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start all threads at once + startLatch.countDown(); + + // Wait for all threads to complete + boolean completed = doneLatch.await(30, TimeUnit.SECONDS); + assertTrue("All threads should complete within timeout", completed); + assertEquals("No exceptions should occur during concurrent access", 0, exceptions.get()); + + // Verify the final counter value + verifyCounter(testRegistry, + "RpcThrottlingException_Type_NumRequestsExceeded_User_alice_Table_users", + numThreads * incrementsPerThread); + + executor.shutdown(); + } + + @Test + public void testCommonTableNamePatterns() { + setupTestMetrics(); + + // Test common HBase table name patterns that should be preserved + throttleMetrics.recordThrottleException(RpcThrottlingException.Type.NumRequestsExceeded, + "service-user", "my-app-logs"); + throttleMetrics.recordThrottleException(RpcThrottlingException.Type.WriteSizeExceeded, + "batch.process", "namespace:table-name"); + throttleMetrics.recordThrottleException(RpcThrottlingException.Type.ReadSizeExceeded, + "user_123", "test_table_v2"); + + verifyCounter(testRegistry, + "RpcThrottlingException_Type_NumRequestsExceeded_User_service-user_Table_my-app-logs", 1); + verifyCounter(testRegistry, + "RpcThrottlingException_Type_WriteSizeExceeded_User_batch.process_Table_namespace:table-name", + 1); + verifyCounter(testRegistry, + "RpcThrottlingException_Type_ReadSizeExceeded_User_user_123_Table_test_table_v2", 1); + } + + @Test + public void testAllThrottleExceptionTypes() { + setupTestMetrics(); + + // Test all 13 throttle exception types from RpcThrottlingException.Type enum + RpcThrottlingException.Type[] throttleTypes = RpcThrottlingException.Type.values(); + + // Record one exception for each type + for (RpcThrottlingException.Type throttleType : throttleTypes) { + throttleMetrics.recordThrottleException(throttleType, "testuser", "testtable"); + } + + // Verify all counters were created with correct values + for (RpcThrottlingException.Type throttleType : throttleTypes) { + String expectedMetricName = + "RpcThrottlingException_Type_" + throttleType.name() + "_User_testuser_Table_testtable"; + verifyCounter(testRegistry, expectedMetricName, 1); + } + } + + @Test + public void testMultipleInstances() { + setupTestMetrics(); + + // Test that multiple instances of MetricsThrottleExceptions work with the same registry + MetricsThrottleExceptions metrics1 = new MetricsThrottleExceptions(testRegistry); + MetricsThrottleExceptions metrics2 = new MetricsThrottleExceptions(testRegistry); + + // Record different exceptions on each instance + metrics1.recordThrottleException(RpcThrottlingException.Type.NumRequestsExceeded, "alice", + "table1"); + metrics2.recordThrottleException(RpcThrottlingException.Type.WriteSizeExceeded, "bob", + "table2"); + + // Verify both counters exist in the shared registry + verifyCounter(testRegistry, + "RpcThrottlingException_Type_NumRequestsExceeded_User_alice_Table_table1", 1); + verifyCounter(testRegistry, + "RpcThrottlingException_Type_WriteSizeExceeded_User_bob_Table_table2", 1); + } + + /** + * Helper method to set up test metrics registry and instance + */ + private void setupTestMetrics() { + MetricRegistryInfo registryInfo = getRegistryInfo(); + testRegistry = MetricRegistries.global().create(registryInfo); + throttleMetrics = new MetricsThrottleExceptions(testRegistry); + } + + /** + * Helper method to verify a counter exists and has the expected value + */ + private void verifyCounter(MetricRegistry registry, String metricName, long expectedCount) { + Optional metric = registry.get(metricName); + assertTrue("Counter metric '" + metricName + "' should be present", metric.isPresent()); + assertTrue("Metric should be a counter", metric.get() instanceof Counter); + + Counter counter = (Counter) metric.get(); + assertEquals("Counter '" + metricName + "' should have expected count", expectedCount, + counter.getCount()); + } + + /** + * Helper method to create the expected MetricRegistryInfo for ThrottleExceptions + */ + private MetricRegistryInfo getRegistryInfo() { + return new MetricRegistryInfo("ThrottleExceptions", "Metrics about RPC throttling exceptions", + "RegionServer,sub=ThrottleExceptions", "regionserver", false); + } +} From 6dc9e9d0b68e291c746d23055fcb32e87dcacdb5 Mon Sep 17 00:00:00 2001 From: Hernan Gelaf-Romer Date: Thu, 11 Sep 2025 15:15:12 -0400 Subject: [PATCH 8/8] HBASE-28440: Add support for using mapreduce sort in HFileOutputFormat2 (not yet merged upstream) --- .../impl/IncrementalTableBackupClient.java | 4 + .../mapreduce/MapReduceHFileSplitterJob.java | 37 +++- .../hbase/mapreduce/HFileOutputFormat2.java | 36 +++- .../apache/hadoop/hbase/mapreduce/Import.java | 4 + .../mapreduce/KeyOnlyCellComparable.java | 91 ++++++++++ .../mapreduce/PreSortedCellsReducer.java | 46 +++++ .../hadoop/hbase/mapreduce/WALPlayer.java | 38 +++- .../mapreduce/TestCellBasedWALPlayer2.java | 3 +- .../hadoop/hbase/mapreduce/TestWALPlayer.java | 166 +++++++++++++----- .../TestUnattainableBalancerCostGoal.java | 4 +- 10 files changed, 366 insertions(+), 63 deletions(-) create mode 100644 hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java create mode 100644 hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 14592806acec..5f86f2a57b88 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -346,6 +346,7 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I LOG.debug("Setting incremental copy HFiles job name to : " + jobname); } conf.set(JOB_NAME_CONF_KEY, jobname); + conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); @@ -358,6 +359,7 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I + " finished."); } finally { deleteBulkLoadDirectory(); + conf.unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY); } } @@ -411,6 +413,7 @@ protected void walToHFiles(List dirPaths, List tableList) throws conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true); conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); + conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); conf.set(JOB_NAME_CONF_KEY, jobname); // Rack-aware WAL processing configuration is set directly via command line to the same key @@ -423,6 +426,7 @@ protected void walToHFiles(List dirPaths, List tableList) throws if (result != 0) { throw new IOException("WAL Player failed"); } + conf.unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY); conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); conf.unset(JOB_NAME_CONF_KEY); } catch (IOException e) { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java index 7d6ad00ddbf1..4938f9470540 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; @@ -33,11 +34,14 @@ import org.apache.hadoop.hbase.mapreduce.CellSortReducer; import org.apache.hadoop.hbase.mapreduce.HFileInputFormat; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.hbase.mapreduce.KeyOnlyCellComparable; +import org.apache.hadoop.hbase.mapreduce.PreSortedCellsReducer; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MapReduceExtendedCell; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -71,18 +75,28 @@ protected MapReduceHFileSplitterJob(final Configuration c) { /** * A mapper that just writes out cells. This one can be used together with {@link CellSortReducer} */ - static class HFileCellMapper extends Mapper { + static class HFileCellMapper extends Mapper, Cell> { + + private boolean diskBasedSortingEnabled = false; @Override public void map(NullWritable key, Cell value, Context context) throws IOException, InterruptedException { - context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), - new MapReduceExtendedCell(value)); + ExtendedCell extendedCell = (ExtendedCell) value; + context.write(wrap(extendedCell), new MapReduceExtendedCell(extendedCell)); } @Override public void setup(Context context) throws IOException { - // do nothing + diskBasedSortingEnabled = + HFileOutputFormat2.diskBasedSortingEnabled(context.getConfiguration()); + } + + private WritableComparable wrap(ExtendedCell cell) { + if (diskBasedSortingEnabled) { + return new KeyOnlyCellComparable(cell); + } + return new ImmutableBytesWritable(CellUtil.cloneRow(cell)); } } @@ -109,14 +123,23 @@ public Job createSubmittableJob(String[] args) throws IOException { // Use standard HFileInputFormat which now supports location resolver automatically // HFileInputFormat will automatically detect and log rack-awareness configuration job.setInputFormatClass(HFileInputFormat.class); - - job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + boolean diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf); + if (diskBasedSortingEnabled) { + job.setMapOutputKeyClass(KeyOnlyCellComparable.class); + job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class); + } else { + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + } if (hfileOutPath != null) { LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); TableName tableName = TableName.valueOf(tabName); job.setMapperClass(HFileCellMapper.class); - job.setReducerClass(CellSortReducer.class); + if (diskBasedSortingEnabled) { + job.setReducerClass(PreSortedCellsReducer.class); + } else { + job.setReducerClass(CellSortReducer.class); + } Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(MapReduceExtendedCell.class); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 6ab3bdd25048..7c297015cb03 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; - import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; @@ -50,6 +49,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -83,6 +83,7 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -194,6 +195,11 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) "hbase.mapreduce.hfileoutputformat.extendedcell.enabled"; static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false; + @InterfaceAudience.Private + public static final String DISK_BASED_SORTING_ENABLED_KEY = + "hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled"; + private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false; + public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; @@ -579,12 +585,19 @@ private static void writePartitions(Configuration conf, Path partitionsPath, // Write the actual file FileSystem fs = partitionsPath.getFileSystem(conf); - SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, - ImmutableBytesWritable.class, NullWritable.class); + boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf); + Class keyClass = + diskBasedSortingEnabled ? KeyOnlyCellComparable.class : ImmutableBytesWritable.class; + SequenceFile.Writer writer = + SequenceFile.createWriter(fs, conf, partitionsPath, keyClass, NullWritable.class); try { for (ImmutableBytesWritable startKey : sorted) { - writer.append(startKey, NullWritable.get()); + Writable writable = diskBasedSortingEnabled + ? new KeyOnlyCellComparable(KeyValueUtil.createFirstOnRow(startKey.get())) + : startKey; + + writer.append(writable, NullWritable.get()); } } finally { writer.close(); @@ -631,6 +644,13 @@ public static void configureIncrementalLoad(Job job, TableDescriptor tableDescri configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); } + public static boolean diskBasedSortingEnabled(Configuration conf) { + boolean res = + conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY, DISK_BASED_SORTING_ENABLED_DEFAULT); + LOG.info("Disk based sorting on: {}", res); + return res; + } + static void configureIncrementalLoad(Job job, List multiTableInfo, Class> cls) throws IOException { Configuration conf = job.getConfiguration(); @@ -652,7 +672,13 @@ static void configureIncrementalLoad(Job job, List multiTableInfo, // Based on the configured map output class, set the correct reducer to properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. - if ( + boolean diskBasedSorting = diskBasedSortingEnabled(conf); + + if (diskBasedSorting) { + job.setMapOutputKeyClass(KeyOnlyCellComparable.class); + job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class); + job.setReducerClass(PreSortedCellsReducer.class); + } else if ( KeyValue.class.equals(job.getMapOutputValueClass()) || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass()) ) { diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 4adcfbfcd3f6..03abcf159753 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -200,6 +200,10 @@ public CellWritableComparable(Cell kv) { this.kv = kv; } + public Cell getCell() { + return kv; + } + @Override public void write(DataOutput out) throws IOException { int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java new file mode 100644 index 000000000000..a065abd63d08 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class KeyOnlyCellComparable implements WritableComparable { + + static { + WritableComparator.define(KeyOnlyCellComparable.class, new KeyOnlyCellComparator()); + } + + private ExtendedCell cell = null; + + public KeyOnlyCellComparable() { + } + + public KeyOnlyCellComparable(ExtendedCell cell) { + this.cell = cell; + } + + public ExtendedCell getCell() { + return cell; + } + + @Override + public int compareTo(KeyOnlyCellComparable o) { + return CellComparator.getInstance().compare(cell, o.cell); + } + + @Override + public void write(DataOutput out) throws IOException { + int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(cell); + int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value. + out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); + out.writeInt(keyLen); + out.writeInt(valueLen); + PrivateCellUtil.writeFlatKey(cell, out); + out.writeLong(cell.getSequenceId()); + } + + @Override + public void readFields(DataInput in) throws IOException { + cell = KeyValue.create(in); + long seqId = in.readLong(); + cell.setSequenceId(seqId); + } + + public static class KeyOnlyCellComparator extends WritableComparator { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + try { + KeyOnlyCellComparable kv1 = new KeyOnlyCellComparable(); + kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); + KeyOnlyCellComparable kv2 = new KeyOnlyCellComparable(); + kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); + return compare(kv1, kv2); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java new file mode 100644 index 000000000000..81871ffb59c2 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.MapReduceExtendedCell; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class PreSortedCellsReducer + extends Reducer { + + @Override + protected void reduce(KeyOnlyCellComparable key, Iterable values, Context context) + throws IOException, InterruptedException { + + int index = 0; + for (Cell cell : values) { + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(key.getCell())), + new MapReduceExtendedCell(cell)); + + if (++index % 100 == 0) { + context.setStatus("Wrote " + index + " cells"); + } + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 189727f81b0f..511b989e7542 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.util.MapReduceExtendedCell; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -63,7 +65,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; /** @@ -162,9 +163,10 @@ public void setup(Context context) throws IOException { /** * A mapper that just writes out Cells. This one can be used together with {@link CellSortReducer} */ - static class WALCellMapper extends Mapper { + static class WALCellMapper extends Mapper, Cell> { private Set tableSet = new HashSet<>(); private boolean multiTableSupport = false; + private boolean diskBasedSortingEnabled = false; @Override public void map(WALKey key, WALEdit value, Context context) throws IOException { @@ -185,7 +187,8 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException { byte[] outKey = multiTableSupport ? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell)) : CellUtil.cloneRow(cell); - context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell)); + ExtendedCell extendedCell = (ExtendedCell) cell; + context.write(wrapKey(outKey, extendedCell), new MapReduceExtendedCell(extendedCell)); } } } catch (InterruptedException e) { @@ -198,8 +201,23 @@ public void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); String[] tables = conf.getStrings(TABLES_KEY); this.multiTableSupport = conf.getBoolean(MULTI_TABLES_SUPPORT, false); + this.diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf); Collections.addAll(tableSet, tables); } + + private WritableComparable wrapKey(byte[] key, ExtendedCell cell) { + if (this.diskBasedSortingEnabled) { + // Important to build a new cell with the updated key to maintain multi-table support + KeyValue kv = new KeyValue(key, 0, key.length, cell.getFamilyArray(), + cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength(), cell.getTimestamp(), + KeyValue.Type.codeToType(cell.getTypeByte()), null, 0, 0); + kv.setSequenceId(cell.getSequenceId()); + return new KeyOnlyCellComparable(kv); + } else { + return new ImmutableBytesWritable(key); + } + } } /** @@ -377,7 +395,13 @@ public Job createSubmittableJob(String[] args) throws IOException { job.setJarByClass(WALPlayer.class); job.setInputFormatClass(WALInputFormat.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); + boolean diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf); + if (diskBasedSortingEnabled) { + job.setMapOutputKeyClass(KeyOnlyCellComparable.class); + job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class); + } else { + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + } String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); if (hfileOutPath != null) { @@ -396,7 +420,11 @@ public Job createSubmittableJob(String[] args) throws IOException { List tableNames = getTableNameList(tables); job.setMapperClass(WALCellMapper.class); - job.setReducerClass(CellSortReducer.class); + if (diskBasedSortingEnabled) { + job.setReducerClass(PreSortedCellsReducer.class); + } else { + job.setReducerClass(CellSortReducer.class); + } Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(MapReduceExtendedCell.class); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedWALPlayer2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedWALPlayer2.java index 283acbabf6e4..d6c4b623ad42 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedWALPlayer2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedWALPlayer2.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.util.ToolRunner; @@ -172,7 +173,7 @@ private void testWALKeyValueMapper(final String tableConfigKey) throws Exception WALKey key = mock(WALKey.class); when(key.getTableName()).thenReturn(TableName.valueOf("table")); @SuppressWarnings("unchecked") - Mapper.Context context = mock(Context.class); + Mapper, Cell>.Context context = mock(Context.class); when(context.getConfiguration()).thenReturn(configuration); WALEdit value = mock(WALEdit.class); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index c6e51eee40ff..143043a039de 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - import java.io.ByteArrayOutputStream; import java.io.File; import java.io.PrintStream; @@ -114,6 +113,50 @@ public static void afterClass() throws Exception { logFs.delete(walRootDir, true); } + @Test + public void testDiskBasedSortingEnabled() throws Exception { + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); + final byte[] FAMILY = Bytes.toBytes("family"); + final byte[] COLUMN1 = Bytes.toBytes("c1"); + final byte[] COLUMN2 = Bytes.toBytes("c2"); + final byte[] ROW = Bytes.toBytes("row"); + Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); + Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); + + // put a row into the first table + Put p = new Put(ROW); + p.addColumn(FAMILY, COLUMN1, COLUMN1); + p.addColumn(FAMILY, COLUMN2, COLUMN2); + t1.put(p); + // delete one column + Delete d = new Delete(ROW); + d.addColumns(FAMILY, COLUMN1); + t1.delete(d); + + // replay the WAL, map table 1 to table 2 + WAL log = cluster.getRegionServer(0).getWAL(null); + log.rollWriter(); + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), + HConstants.HREGION_LOGDIR_NAME).toString(); + + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); + WALPlayer player = new WALPlayer(configuration); + String optionName = "_test_.name"; + configuration.set(optionName, "1000"); + player.setupTime(configuration, optionName); + assertEquals(1000, configuration.getLong(optionName, 0)); + assertEquals(0, ToolRunner.run(configuration, player, + new String[] { walInputDir, tableName1.getNameAsString(), tableName2.getNameAsString() })); + + // verify the WAL was player into table 2 + Get g = new Get(ROW); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); + } + /** * Test that WALPlayer can replay recovered.edits files. */ @@ -123,19 +166,22 @@ public void testPlayingRecoveredEdit() throws Exception { TEST_UTIL.createTable(tn, TestRecoveredEdits.RECOVEREDEDITS_COLUMNFAMILY); // Copy testing recovered.edits file that is over under hbase-server test resources // up into a dir in our little hdfs cluster here. - String hbaseServerTestResourcesEdits = - System.getProperty("test.build.classes") + "/../../../hbase-server/src/test/resources/" - + TestRecoveredEdits.RECOVEREDEDITS_PATH.getName(); - assertTrue(new File(hbaseServerTestResourcesEdits).exists()); - FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); - // Target dir. - Path targetDir = new Path("edits").makeQualified(dfs.getUri(), dfs.getHomeDirectory()); - assertTrue(dfs.mkdirs(targetDir)); - dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), targetDir); - assertEquals(0, - ToolRunner.run(new WALPlayer(this.conf), new String[] { targetDir.toString() })); - // I don't know how many edits are in this file for this table... so just check more than 1. - assertTrue(TEST_UTIL.countRows(tn) > 0); + runWithDiskBasedSortingDisabledAndEnabled(() -> { + String hbaseServerTestResourcesEdits = + System.getProperty("test.build.classes") + "/../../../hbase-server/src/test/resources/" + + TestRecoveredEdits.RECOVEREDEDITS_PATH.getName(); + assertTrue(new File(hbaseServerTestResourcesEdits).exists()); + FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + // Target dir. + Path targetDir = new Path("edits").makeQualified(dfs.getUri(), dfs.getHomeDirectory()); + assertTrue(dfs.mkdirs(targetDir)); + dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), targetDir); + assertEquals(0, + ToolRunner.run(new WALPlayer(this.conf), new String[] { targetDir.toString() })); + // I don't know how many edits are in this file for this table... so just check more than 1. + assertTrue(TEST_UTIL.countRows(tn) > 0); + dfs.delete(targetDir, true); + }); } /** @@ -150,7 +196,7 @@ public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception { final byte[] column1 = Bytes.toBytes("c1"); final byte[] column2 = Bytes.toBytes("c2"); final byte[] row = Bytes.toBytes("row"); - Table table = TEST_UTIL.createTable(tableName, family); + final Table table = TEST_UTIL.createTable(tableName, family); long now = EnvironmentEdgeManager.currentTime(); // put a row into the first table @@ -188,28 +234,37 @@ public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception { configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); WALPlayer player = new WALPlayer(configuration); - assertEquals(0, ToolRunner.run(configuration, player, - new String[] { walInputDir, tableName.getNameAsString() })); + final byte[] finalLastVal = lastVal; + + runWithDiskBasedSortingDisabledAndEnabled(() -> { + assertEquals(0, ToolRunner.run(configuration, player, + new String[] { walInputDir, tableName.getNameAsString() })); - Get g = new Get(row); - Result result = table.get(g); - byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); - assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); + Get g = new Get(row); + Result result = table.get(g); + byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); + assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(finalLastVal))); - table = TEST_UTIL.truncateTable(tableName); - g = new Get(row); - result = table.get(g); - assertThat(result.listCells(), nullValue()); + TEST_UTIL.truncateTable(tableName); + g = new Get(row); + result = table.get(g); + assertThat(result.listCells(), nullValue()); - BulkLoadHFiles.create(configuration).bulkLoad(tableName, - new Path(outPath, tableName.getNameAsString())); + BulkLoadHFiles.create(configuration).bulkLoad(tableName, + new Path(outPath, tableName.getNamespaceAsString() + "/" + tableName.getNameAsString())); - g = new Get(row); - result = table.get(g); - value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); + g = new Get(row); + result = table.get(g); + value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); - assertThat(result.listCells(), notNullValue()); - assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); + assertThat(result.listCells(), notNullValue()); + assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(finalLastVal))); + + // cleanup + Path out = new Path(outPath); + FileSystem fs = out.getFileSystem(configuration); + assertTrue(fs.delete(out, true)); + }); } /** @@ -244,18 +299,21 @@ public void testWALPlayer() throws Exception { Configuration configuration = TEST_UTIL.getConfiguration(); WALPlayer player = new WALPlayer(configuration); - String optionName = "_test_.name"; - configuration.set(optionName, "1000"); - player.setupTime(configuration, optionName); - assertEquals(1000, configuration.getLong(optionName, 0)); - assertEquals(0, ToolRunner.run(configuration, player, - new String[] { walInputDir, tableName1.getNameAsString(), tableName2.getNameAsString() })); - // verify the WAL was player into table 2 - Get g = new Get(ROW); - Result r = t2.get(g); - assertEquals(1, r.size()); - assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); + runWithDiskBasedSortingDisabledAndEnabled(() -> { + String optionName = "_test_.name"; + configuration.set(optionName, "1000"); + player.setupTime(configuration, optionName); + assertEquals(1000, configuration.getLong(optionName, 0)); + assertEquals(0, ToolRunner.run(configuration, player, + new String[] { walInputDir, tableName1.getNameAsString(), tableName2.getNameAsString() })); + + // verify the WAL was player into table 2 + Get g = new Get(ROW); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); + }); } /** @@ -335,7 +393,29 @@ public void testMainMethod() throws Exception { System.setErr(oldPrintStream); System.setSecurityManager(SECURITY_MANAGER); } + } + + private static void runWithDiskBasedSortingDisabledAndEnabled(TestMethod method) + throws Exception { + TEST_UTIL.getConfiguration().setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, + false); + try { + method.run(); + } finally { + TEST_UTIL.getConfiguration().unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY); + } + + TEST_UTIL.getConfiguration().setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, + true); + try { + method.run(); + } finally { + TEST_UTIL.getConfiguration().unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY); + } + } + private interface TestMethod { + void run() throws Exception; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestUnattainableBalancerCostGoal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestUnattainableBalancerCostGoal.java index 5e95564b6fee..cf3f241cab89 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestUnattainableBalancerCostGoal.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestUnattainableBalancerCostGoal.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.ServerName; @@ -33,7 +32,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -41,6 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; + /** * If your minCostNeedsBalance is set too low, then the balancer should still eventually stop making * moves as further cost improvements become impossible, and balancer plan calculation becomes