diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 35f212b44597..7912ab65c81d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -4428,4 +4428,10 @@ protected String getDescription() { } }); } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + public MobFileCleanerChore getMobFileCleanerChore() { + return mobFileCleanerChore; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java index 51f287ce66e1..052058a42ada 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java @@ -149,6 +149,12 @@ public final class MobConstants { public static final String MOB_COMPACTION_THREADS_MAX = "hbase.mob.compaction.threads.max"; public static final int DEFAULT_MOB_COMPACTION_THREADS_MAX = 1; + public static final String MOB_CLEANER_THREAD_COUNT = "hbase.master.mob.cleaner.threads"; + public static final int DEFAULT_MOB_CLEANER_THREAD_COUNT = 1; + public static final String MOB_FILE_CLEANER_CHORE_TIME_OUT = + "hbase.master.mob.cleaner.chore.timeout"; + public static final int DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT = 5 * 60; // 5 minutes + private MobConstants() { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java index 9ce20e7c650e..595078230b3a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java @@ -17,30 +17,49 @@ */ package org.apache.hadoop.hbase.mob; +import static org.apache.hadoop.hbase.mob.MobConstants.DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT; +import static org.apache.hadoop.hbase.mob.MobConstants.MOB_FILE_CLEANER_CHORE_TIME_OUT; + +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.master.HMaster; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete * (files which have no active references to) mob files. */ @InterfaceAudience.Private -public class MobFileCleanerChore extends ScheduledChore { +public class MobFileCleanerChore extends ScheduledChore implements ConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(MobFileCleanerChore.class); + private final HMaster master; - private ExpiredMobFileCleaner cleaner; + private final ExpiredMobFileCleaner cleaner; + private final ThreadPoolExecutor executor; + private final int cleanerFutureTimeout; + private int threadCount; static { Configuration.addDeprecation(MobConstants.DEPRECATED_MOB_CLEANER_PERIOD, @@ -57,7 +76,21 @@ public MobFileCleanerChore(HMaster master) { this.master = master; cleaner = new ExpiredMobFileCleaner(); cleaner.setConf(master.getConfiguration()); + threadCount = master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT, + MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT); + if (threadCount < 1) { + threadCount = 1; + } + + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("mobfile-cleaner-pool-%d").build(); + + executor = new ThreadPoolExecutor(threadCount, threadCount, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), threadFactory); + checkObsoleteConfigurations(); + cleanerFutureTimeout = master.getConfiguration().getInt(MOB_FILE_CLEANER_CHORE_TIME_OUT, + DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT); } private void checkObsoleteConfigurations() { @@ -88,29 +121,93 @@ protected void chore() { LOG.error("MobFileCleanerChore failed", e); return; } + List> futureList = new ArrayList<>(map.size()); for (TableDescriptor htd : map.values()) { - for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { - if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) { - try { - cleaner.cleanExpiredMobFiles(htd, hcd); - } catch (IOException e) { - LOG.error("Failed to clean the expired mob files table={} family={}", - htd.getTableName().getNameAsString(), hcd.getNameAsString(), e); - } - } - } + Future future = executor.submit(() -> handleOneTable(htd)); + futureList.add(future); + } + + for (Future future : futureList) { try { - // Now clean obsolete files for a table - LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName()); - try (final Admin admin = master.getConnection().getAdmin()) { - MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(), - admin); + future.get(cleanerFutureTimeout, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("MobFileCleanerChore interrupted while waiting for futures", e); + Thread.currentThread().interrupt(); + cancelAllFutures(futureList); + break; + } catch (ExecutionException e) { + LOG.error("Exception during execution of MobFileCleanerChore task", e); + } catch (TimeoutException e) { + LOG.error("MobFileCleanerChore timed out waiting for a task to complete", e); + } + } + } + + private void cancelAllFutures(List> futureList) { + long pendingTaskCounter = 0; + for (Future f : futureList) { + if (!f.isDone()) { + f.cancel(true); // interrupt running tasks + pendingTaskCounter++; + } + } + LOG.info("Cancelled {} pending mob file cleaner tasks", pendingTaskCounter); + } + + private void handleOneTable(TableDescriptor htd) { + for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { + if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) { + try { + cleaner.cleanExpiredMobFiles(htd, hcd); + } catch (IOException e) { + LOG.error("Failed to clean the expired mob files table={} family={}", + htd.getTableName().getNameAsString(), hcd.getNameAsString(), e); } - LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName()); - } catch (IOException e) { - LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e); } } + try { + // Now clean obsolete files for a table + LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName()); + try (final Admin admin = master.getConnection().getAdmin()) { + MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(), + admin); + } + LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName()); + } catch (IOException e) { + LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e); + } + } + + @Override + public void onConfigurationChange(Configuration conf) { + int newThreadCount = conf.getInt(MobConstants.MOB_CLEANER_THREAD_COUNT, + MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT); + if (newThreadCount < 1) { + return; // invalid value , skip the config change + } + + if (newThreadCount != threadCount) { + resizeThreadPool(newThreadCount, newThreadCount); + threadCount = newThreadCount; + } } + private void resizeThreadPool(int newCoreSize, int newMaxSize) { + int currentCoreSize = executor.getCorePoolSize(); + if (newCoreSize > currentCoreSize) { + // Increasing the pool size: Set max first, then core + executor.setMaximumPoolSize(newMaxSize); + executor.setCorePoolSize(newCoreSize); + } else { + // Decreasing the pool size: Set core first, then max + executor.setCorePoolSize(newCoreSize); + executor.setMaximumPoolSize(newMaxSize); + } + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + public ThreadPoolExecutor getExecutor() { + return executor; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java new file mode 100644 index 000000000000..3d9cced89354 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import static org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_BATCH_SIZE_UPPER_BOUND; +import static org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_THREAD_COUNT; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, MasterTests.class }) +public class TestExpiredMobFileCleanerChore { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestExpiredMobFileCleanerChore.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static TableName tableName = TableName.valueOf("TestExpiredMobFileCleaner"); + private final static TableName tableName2 = TableName.valueOf("TestExpiredMobFileCleaner2"); + private final static String family = "family"; + private final static byte[] row1 = Bytes.toBytes("row1"); + private final static byte[] row2 = Bytes.toBytes("row2"); + private final static byte[] row3 = Bytes.toBytes("row3"); + private final static byte[] qf = Bytes.toBytes("qf"); + + private static BufferedMutator table; + private static Admin admin; + private static BufferedMutator table2; + private static MobFileCleanerChore mobFileCleanerChore; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_BATCH_SIZE_UPPER_BOUND, 2); + TEST_UTIL.startMiniCluster(1); + mobFileCleanerChore = TEST_UTIL.getMiniHBaseCluster().getMaster().getMobFileCleanerChore(); + } + + @After + public void cleanUp() throws IOException { + admin.disableTable(tableName); + admin.deleteTable(tableName); + admin.disableTable(tableName2); + admin.deleteTable(tableName2); + admin.close(); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true); + } + + @Test + public void testCleanerSingleThread() throws Exception { + TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 1); + mobFileCleanerChore.onConfigurationChange(TEST_UTIL.getConfiguration()); + int corePoolSize = mobFileCleanerChore.getExecutor().getCorePoolSize(); + Assert.assertEquals(1, corePoolSize); + testCleanerInternal(); + } + + @Test + public void testCleanerMultiThread() throws Exception { + TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 2); + mobFileCleanerChore.onConfigurationChange(TEST_UTIL.getConfiguration()); + int corePoolSize = mobFileCleanerChore.getExecutor().getCorePoolSize(); + Assert.assertEquals(2, corePoolSize); + testCleanerInternal(); + } + + private static void init() throws Exception { + TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName); + ColumnFamilyDescriptor columnFamilyDescriptor = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true) + .setMobThreshold(3L).setMaxVersions(4).build(); + tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor); + + admin = TEST_UTIL.getAdmin(); + admin.createTable(tableDescriptorBuilder.build()); + + table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()) + .getBufferedMutator(tableName); + + TableDescriptorBuilder tableDescriptorBuilder2 = TableDescriptorBuilder.newBuilder(tableName2); + ColumnFamilyDescriptor columnFamilyDescriptor2 = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true) + .setMobThreshold(3L).setMaxVersions(4).build(); + tableDescriptorBuilder2.setColumnFamily(columnFamilyDescriptor2); + admin.createTable(tableDescriptorBuilder2.build()); + + table2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()) + .getBufferedMutator(tableName2); + } + + private static void modifyColumnExpiryDays(int expireDays) throws Exception { + + // change ttl as expire days to make some row expired + int timeToLive = expireDays * secondsOfDay(); + ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder + .newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L); + columnFamilyDescriptorBuilder.setTimeToLive(timeToLive); + + admin.modifyColumnFamily(tableName, columnFamilyDescriptorBuilder.build()); + + ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder2 = ColumnFamilyDescriptorBuilder + .newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L); + columnFamilyDescriptorBuilder2.setTimeToLive(timeToLive); + + admin.modifyColumnFamily(tableName2, columnFamilyDescriptorBuilder2.build()); + } + + private static void putKVAndFlush(BufferedMutator table, byte[] row, byte[] value, long ts, + TableName tableName) throws Exception { + + Put put = new Put(row, ts); + put.addColumn(Bytes.toBytes(family), qf, value); + table.mutate(put); + + table.flush(); + admin.flush(tableName); + } + + /** + * Creates a 3 day old hfile and an 1 day old hfile then sets expiry to 2 days. Verifies that the + * 3 day old hfile is removed but the 1 day one is still present after the expiry based cleaner is + * run. + */ + private static void testCleanerInternal() throws Exception { + init(); + + Path mobDirPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family); + + byte[] dummyData = makeDummyData(600); + long ts = EnvironmentEdgeManager.currentTime() - 3 * secondsOfDay() * 1000; // 3 days before + putKVAndFlush(table, row1, dummyData, ts, tableName); + FileStatus[] firstFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); + // the first mob file + assertEquals("Before cleanup without delay 1", 1, firstFiles.length); + String firstFile = firstFiles[0].getPath().getName(); + + // 1.5 day before + ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() * 1000); + putKVAndFlush(table, row2, dummyData, ts, tableName); + FileStatus[] secondFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); + // now there are 2 mob files + assertEquals("Before cleanup without delay 2", 2, secondFiles.length); + String f1 = secondFiles[0].getPath().getName(); + String f2 = secondFiles[1].getPath().getName(); + String secondFile = f1.equals(firstFile) ? f2 : f1; + + ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before + putKVAndFlush(table, row3, dummyData, ts, tableName); + ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before + putKVAndFlush(table, row3, dummyData, ts, tableName); + FileStatus[] thirdFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); + // now there are 4 mob files + assertEquals("Before cleanup without delay 3", 4, thirdFiles.length); + + // modifyColumnExpiryDays(2); // ttl = 2, make the first row expired + + // for table 2 + Path mobDirPath2 = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName2, family); + + byte[] dummyData2 = makeDummyData(600); + + putKVAndFlush(table2, row1, dummyData2, ts, tableName2); + FileStatus[] firstFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2); + // the first mob file + assertEquals("Before cleanup without delay 1", 1, firstFiles2.length); + String firstFile2 = firstFiles2[0].getPath().getName(); + + // 1.5 day before + ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() * 1000); + putKVAndFlush(table2, row2, dummyData2, ts, tableName2); + FileStatus[] secondFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2); + // now there are 2 mob files + assertEquals("Before cleanup without delay 2", 2, secondFiles2.length); + String f1Second = secondFiles2[0].getPath().getName(); + String f2Second = secondFiles2[1].getPath().getName(); + String secondFile2 = f1Second.equals(firstFile2) ? f2Second : f1Second; + ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before + putKVAndFlush(table2, row3, dummyData2, ts, tableName2); + ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before + putKVAndFlush(table2, row3, dummyData2, ts, tableName2); + FileStatus[] thirdFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2); + // now there are 4 mob files + assertEquals("Before cleanup without delay 3", 4, thirdFiles2.length); + + modifyColumnExpiryDays(2); // ttl = 2, make the first row expired + + // run the cleaner chore + mobFileCleanerChore.chore(); + + FileStatus[] filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath); + String lastFile = filesAfterClean[0].getPath().getName(); + // there are 4 mob files in total, but only 3 need to be cleaned + assertEquals("After cleanup without delay 1", 1, filesAfterClean.length); + assertEquals("After cleanup without delay 2", secondFile, lastFile); + + filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2); + lastFile = filesAfterClean[0].getPath().getName(); + // there are 4 mob files in total, but only 3 need to be cleaned + assertEquals("After cleanup without delay 1", 1, filesAfterClean.length); + assertEquals("After cleanup without delay 2", secondFile2, lastFile); + } + + private static int secondsOfDay() { + return 24 * 3600; + } + + private static byte[] makeDummyData(int size) { + byte[] dummyData = new byte[size]; + Bytes.random(dummyData); + return dummyData; + } +}