Skip to content

Commit 8df8aa6

Browse files
zclllyybbYour Name
authored andcommitted
[Feature](partition) Support new partition recycle mechanism (#57013)
now support: ```sql create table auto_recycle( k0 datetime(6) not null ) auto partition by range (date_trunc(k0, 'day')) () DISTRIBUTED BY HASH(`k0`) BUCKETS 1 properties( "replication_num" = "1", "partition.retention_count" = "3" ); ``` which means only keep the latest 3 partition in history partitions. ccr will be tested in selectdb/ccr-syncer#646 ### Release note Resolve the problem of different name format when need to recycle auto partition
1 parent e818321 commit 8df8aa6

File tree

17 files changed

+572
-13
lines changed

17 files changed

+572
-13
lines changed

fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ private boolean processAlterOlapTableInternal(List<AlterClause> alterClauses, Ol
255255
// TODO(Drogon): check error
256256
((SchemaChangeHandler) schemaChangeHandler).updateBinlogConfig(db, olapTable, alterClauses);
257257
} else if (currentAlterOps.hasSchemaChangeOp()) {
258-
// if modify storage type to v2, do schema change to convert all related tablets to segment v2 format
258+
// schema change, or change properties that need schema change(dynamic partition, storage_medium...)
259259
schemaChangeHandler.process(sql, alterClauses, db, olapTable);
260260
} else if (currentAlterOps.hasRollupOp()) {
261261
materializedViewHandler.process(alterClauses, db, olapTable);
@@ -589,6 +589,11 @@ private void processModifyEngineInternal(Database db, Table externalTable,
589589
}
590590
}
591591

592+
/*
593+
* There's two ways to process properties' change:
594+
* 1. processAlterOlapTable will trigger schemaChangeHandler.process
595+
* 2. as ModifyTablePropertiesClause trigger schemaChangeHandler.updateTableProperties
596+
*/
592597
public void processAlterTable(AlterTableCommand command) throws UserException {
593598
TableNameInfo dbTableName = command.getTbl();
594599
String ctlName = dbTableName.getCtl();

fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1974,6 +1974,7 @@ public int getAsInt() {
19741974
Map<String, String> propertyMap = new HashMap<>();
19751975
for (AlterClause alterClause : alterClauses) {
19761976
Map<String, String> properties = alterClause.getProperties();
1977+
// alter table properties
19771978
if (properties != null) {
19781979
if (propertyMap.isEmpty()) {
19791980
propertyMap.putAll(properties);
@@ -2004,6 +2005,10 @@ public int getAsInt() {
20042005
return;
20052006
} else if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
20062007
DynamicPartitionUtil.checkDynamicPartitionPropertyKeysValid(properties);
2008+
if (olapTable.getPartitionRetentionCount() > 0) {
2009+
throw new DdlException("Can not use partition.retention_count and "
2010+
+ "dynamic_partition properties at the same time");
2011+
}
20072012
if (!olapTable.dynamicPartitionExists()) {
20082013
try {
20092014
DynamicPartitionUtil.checkInputDynamicPartitionProperties(properties,
@@ -2352,6 +2357,7 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
23522357
add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD);
23532358
add(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY);
23542359
add(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM);
2360+
add(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_COUNT);
23552361
}
23562362
};
23572363
List<String> notAllowedProps = properties.keySet().stream().filter(s -> !allowedProps.contains(s))
@@ -2372,6 +2378,13 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
23722378
olapTable.readUnlock();
23732379
}
23742380

2381+
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_COUNT)
2382+
&& !(olapTable.getPartitionInfo().enableAutomaticPartition()
2383+
&& olapTable.getPartitionInfo().getType() == PartitionType.RANGE)) {
2384+
throw new UserException("Only AUTO RANGE PARTITION table could set "
2385+
+ PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_COUNT);
2386+
}
2387+
23752388
String inMemory = properties.get(PropertyAnalyzer.PROPERTIES_INMEMORY);
23762389
int isInMemory = -1; // < 0 means don't update inMemory properties
23772390
if (inMemory != null) {
@@ -2442,7 +2455,8 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
24422455
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)
24432456
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)
24442457
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY)
2445-
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM)) {
2458+
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM)
2459+
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_COUNT)) {
24462460
LOG.info("Properties already up-to-date");
24472461
return;
24482462
}
@@ -2489,6 +2503,9 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
24892503
} finally {
24902504
olapTable.writeUnlock();
24912505
}
2506+
2507+
// after modifyTableProperties, buildPartitionRetentionCount has been done.
2508+
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
24922509
}
24932510

24942511
/**

fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2727,6 +2727,10 @@ protected void setTableStateToNormalAndUpdateProperties(Database db, boolean com
27272727
DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
27282728
}
27292729
}
2730+
// auto partition with retention_count also need to be registered to dynamic scheduler
2731+
if (committed && olapTbl.getPartitionRetentionCount() > 0) {
2732+
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTbl, isReplay);
2733+
}
27302734
if (committed && isBeingSynced) {
27312735
olapTbl.setBeingSyncedProperties();
27322736
}

fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3684,6 +3684,12 @@ private static void addOlapTablePropertyInfo(OlapTable olapTable, StringBuilder
36843684
sb.append(olapTable.getTableProperty().getDynamicPartitionProperty().getProperties(replicaAlloc));
36853685
}
36863686

3687+
// partition retention count
3688+
if (olapTable.getPartitionRetentionCount() > 0) {
3689+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_COUNT).append("\" = \"");
3690+
sb.append(olapTable.getPartitionRetentionCount()).append("\"");
3691+
}
3692+
36873693
// only display z-order sort info
36883694
if (olapTable.isZOrderSort()) {
36893695
sb.append(olapTable.getDataSortInfo().toSql());
@@ -6046,7 +6052,8 @@ public void modifyTableProperties(Database db, OlapTable table, Map<String, Stri
60466052
.buildTimeSeriesCompactionEmptyRowsetsThreshold()
60476053
.buildTimeSeriesCompactionLevelThreshold()
60486054
.buildTTLSeconds()
6049-
.buildAutoAnalyzeProperty();
6055+
.buildAutoAnalyzeProperty()
6056+
.buildPartitionRetentionCount();
60506057

60516058
// need to update partition info meta
60526059
for (Partition partition : table.getPartitions()) {

fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2456,6 +2456,22 @@ public void setTTLSeconds(long ttlSeconds) {
24562456
tableProperty.buildTTLSeconds();
24572457
}
24582458

2459+
public long getPartitionRetentionCount() {
2460+
if (tableProperty != null) {
2461+
return tableProperty.getPartitionRetentionCount();
2462+
}
2463+
return -1;
2464+
}
2465+
2466+
public void setPartitionRetentionCount(long partitionRetentionCount) {
2467+
if (tableProperty == null) {
2468+
tableProperty = new TableProperty(new HashMap<>());
2469+
}
2470+
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_COUNT,
2471+
Long.valueOf(partitionRetentionCount).toString());
2472+
tableProperty.buildPartitionRetentionCount();
2473+
}
2474+
24592475
public boolean getEnableLightSchemaChange() {
24602476
if (tableProperty != null) {
24612477
return tableProperty.getUseSchemaLightChange();

fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public class TableProperty implements GsonPostProcessable {
6464
private boolean isInMemory = false;
6565
private short minLoadReplicaNum = -1;
6666
private long ttlSeconds = 0L;
67+
private int partitionRetentionCount = -1;
6768
private boolean isInAtomicRestore = false;
6869

6970
private String storagePolicy = "";
@@ -168,6 +169,7 @@ public TableProperty buildProperty(short opCode) {
168169
buildTimeSeriesCompactionLevelThreshold();
169170
buildTTLSeconds();
170171
buildAutoAnalyzeProperty();
172+
buildPartitionRetentionCount();
171173
break;
172174
default:
173175
break;
@@ -253,10 +255,27 @@ public TableProperty buildTTLSeconds() {
253255
return this;
254256
}
255257

258+
public TableProperty buildPartitionRetentionCount() {
259+
String value = properties.get(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_COUNT);
260+
if (value != null) {
261+
try {
262+
int n = Integer.parseInt(value);
263+
partitionRetentionCount = n > 0 ? n : -1;
264+
} catch (NumberFormatException e) {
265+
partitionRetentionCount = -1;
266+
}
267+
}
268+
return this;
269+
}
270+
256271
public long getTTLSeconds() {
257272
return ttlSeconds;
258273
}
259274

275+
public int getPartitionRetentionCount() {
276+
return partitionRetentionCount;
277+
}
278+
260279
public TableProperty buildEnableLightSchemaChange() {
261280
enableLightSchemaChange = Boolean.parseBoolean(
262281
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE, "false"));
@@ -785,6 +804,7 @@ public void gsonPostProcess() throws IOException {
785804
buildTTLSeconds();
786805
buildVariantEnableFlattenNested();
787806
buildInAtomicRestore();
807+
buildPartitionRetentionCount();
788808
removeDuplicateReplicaNumProperty();
789809
buildReplicaAllocation();
790810
buildTDEAlgorithm();

fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.doris.catalog.RangePartitionInfo;
4242
import org.apache.doris.catalog.RangePartitionItem;
4343
import org.apache.doris.catalog.Table;
44+
import org.apache.doris.common.AnalysisException;
4445
import org.apache.doris.common.Config;
4546
import org.apache.doris.common.DdlException;
4647
import org.apache.doris.common.FeConstants;
@@ -55,6 +56,7 @@
5556
import org.apache.doris.common.util.TimeUtils;
5657
import org.apache.doris.datasource.InternalCatalog;
5758
import org.apache.doris.meta.MetaContext;
59+
import org.apache.doris.nereids.util.DateUtils;
5860
import org.apache.doris.persist.PartitionPersistInfo;
5961
import org.apache.doris.thrift.TStorageMedium;
6062

@@ -69,6 +71,7 @@
6971
import org.apache.logging.log4j.Logger;
7072

7173
import java.time.ZonedDateTime;
74+
import java.time.format.DateTimeFormatter;
7275
import java.util.ArrayList;
7376
import java.util.Collection;
7477
import java.util.Collections;
@@ -497,7 +500,7 @@ private Range<PartitionKey> getClosedRange(Database db, OlapTable olapTable, Col
497500
* 1. get the range of [start, 0) as a reserved range.
498501
* 2. get DropPartitionClause of partitions which range are before this reserved range.
499502
*/
500-
private ArrayList<DropPartitionClause> getDropPartitionClause(Database db, OlapTable olapTable,
503+
private ArrayList<DropPartitionClause> getDropPartitionClauseForDynamic(Database db, OlapTable olapTable,
501504
Column partitionColumn, String partitionFormat) throws DdlException {
502505
ArrayList<DropPartitionClause> dropPartitionClauses = new ArrayList<>();
503506
DynamicPartitionProperty dynamicPartitionProperty = olapTable.getTableProperty().getDynamicPartitionProperty();
@@ -592,6 +595,66 @@ private ArrayList<DropPartitionClause> getDropPartitionClause(Database db, OlapT
592595
return dropPartitionClauses;
593596
}
594597

598+
/**
599+
* Get drop partition clauses for AUTO PARTITION tables based on partition.retention_count
600+
*/
601+
private ArrayList<DropPartitionClause> getDropPartitionClauseForAutoPartition(Database db, OlapTable olapTable)
602+
throws DdlException {
603+
ArrayList<DropPartitionClause> dropPartitionClauses = new ArrayList<>();
604+
605+
// when create/modify table, we already checked retentionCount validity.
606+
// only restrict history partitions with this value. not drop any future partition.
607+
int retentionCount = olapTable.getTableProperty().getPartitionRetentionCount();
608+
609+
RangePartitionInfo info = (RangePartitionInfo) (olapTable.getPartitionInfo());
610+
List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet());
611+
// Sort partitions by upper endpoint to keep the latest N partitions
612+
idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint()));
613+
614+
// Get current time as PartitionKey for comparison
615+
ZonedDateTime now = ZonedDateTime.now(DateUtils.getTimeZone());
616+
Column partitionColumn = info.getPartitionColumns().get(0);
617+
String partitionFormat = DynamicPartitionUtil.getPartitionFormat(partitionColumn);
618+
String currentTimeStr = DateTimeFormatter.ofPattern(partitionFormat).format(now);
619+
620+
PartitionValue currentTimeValue = new PartitionValue(currentTimeStr);
621+
PartitionKey currentTimeKey;
622+
try {
623+
currentTimeKey = PartitionKey.createPartitionKey(
624+
Collections.singletonList(currentTimeValue),
625+
Collections.singletonList(partitionColumn));
626+
} catch (AnalysisException e) { // theoretically will not happen
627+
throw new DdlException("Error in create current time partition key. table: "
628+
+ olapTable.getName() + ", error: " + e.getMessage());
629+
}
630+
631+
// Filter out current and future partitions (upper bound >= current time)
632+
List<Map.Entry<Long, PartitionItem>> historyPartitions = new ArrayList<>();
633+
for (Map.Entry<Long, PartitionItem> entry : idToItems) {
634+
RangePartitionItem partitionItem = (RangePartitionItem) entry.getValue();
635+
PartitionKey upperBound = partitionItem.getItems().upperEndpoint();
636+
637+
// Only keep history partitions (upper bound < current time)
638+
if (upperBound.compareTo(currentTimeKey) < 0) {
639+
historyPartitions.add(entry);
640+
}
641+
}
642+
643+
int total = historyPartitions.size();
644+
int keep = Math.min(retentionCount, total);
645+
646+
// Drop partitions except the latest 'keep' partitions
647+
for (int i = 0; i < Math.max(0, total - keep); i++) {
648+
Long dropPartitionId = historyPartitions.get(i).getKey();
649+
String dropPartitionName = olapTable.getPartition(dropPartitionId).getName();
650+
// Do not drop the partition "by force", or the partition will be dropped directly instead of being in
651+
// catalog recycle bin. This is for safe reason.
652+
dropPartitionClauses.add(new DropPartitionClause(false, dropPartitionName, false, false));
653+
}
654+
655+
return dropPartitionClauses;
656+
}
657+
595658
// make public just for fe ut
596659
public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitionTableInfoCol,
597660
boolean executeFirstTime) throws DdlException {
@@ -615,8 +678,10 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
615678
// Only OlapTable has DynamicPartitionProperty
616679
if (olapTable == null
617680
|| olapTable instanceof MTMV
618-
|| !olapTable.dynamicPartitionExists()
619-
|| !olapTable.getTableProperty().getDynamicPartitionProperty().getEnable()) {
681+
// OR (NOT dynamic partition AND NOT partition.retention_count)
682+
|| (!olapTable.dynamicPartitionExists()
683+
|| !olapTable.getTableProperty().getDynamicPartitionProperty().getEnable())
684+
&& olapTable.getPartitionRetentionCount() <= 0) {
620685
iterator.remove();
621686
continue;
622687
} else if (olapTable.isBeingSynced()) {
@@ -630,6 +695,10 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
630695
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(), errorMsg, olapTable.getId());
631696
skipAddPartition = true;
632697
}
698+
// only do clean for auto partition table
699+
if (olapTable.getPartitionRetentionCount() > 0) {
700+
skipAddPartition = true;
701+
}
633702

634703
// Determine the partition column type
635704
// if column type is Date, format partition name as yyyyMMdd
@@ -658,7 +727,14 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
658727
executeFirstTime);
659728
}
660729
clearDropPartitionFailedMsg(olapTable.getId());
661-
dropPartitionClauses = getDropPartitionClause(db, olapTable, partitionColumn, partitionFormat);
730+
if (olapTable.getPartitionRetentionCount() > 0) {
731+
// Handle AUTO PARTITION cleanup based on partition.retention_count
732+
dropPartitionClauses = getDropPartitionClauseForAutoPartition(db, olapTable);
733+
} else {
734+
// Handle dynamic partition cleanup
735+
dropPartitionClauses = getDropPartitionClauseForDynamic(db, olapTable, partitionColumn,
736+
partitionFormat);
737+
}
662738
tableName = olapTable.getName();
663739
} catch (Exception e) {
664740
LOG.warn("db [{}-{}], table [{}-{}]'s dynamic partition has error",

fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
2626
import org.apache.doris.catalog.OlapTable;
2727
import org.apache.doris.catalog.Partition;
28+
import org.apache.doris.catalog.PartitionType;
2829
import org.apache.doris.catalog.Table;
2930
import org.apache.doris.catalog.Tablet;
3031
import org.apache.doris.cloud.proto.Cloud;
@@ -33,6 +34,7 @@
3334
import org.apache.doris.common.DdlException;
3435
import org.apache.doris.common.MetaNotFoundException;
3536
import org.apache.doris.common.UserException;
37+
import org.apache.doris.common.util.DynamicPartitionUtil;
3638
import org.apache.doris.common.util.PropertyAnalyzer;
3739

3840
import com.google.common.base.Preconditions;
@@ -91,6 +93,7 @@ public void updatePartitionsProperties(Database db, String tableName, List<Strin
9193
}
9294
}
9395

96+
//TODO: extract the common code with SchemaChangeHandler
9497
@Override
9598
public void updateTableProperties(Database db, String tableName, Map<String, String> properties)
9699
throws UserException {
@@ -108,6 +111,7 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
108111
add(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION);
109112
add(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE);
110113
add(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY);
114+
add(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_COUNT);
111115
}
112116
};
113117
List<String> notAllowedProps = properties.keySet().stream().filter(s -> !allowedProps.contains(s))
@@ -124,6 +128,13 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
124128
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, Table.TableType.OLAP);
125129
UpdatePartitionMetaParam param = new UpdatePartitionMetaParam();
126130

131+
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_COUNT)
132+
&& !(olapTable.getPartitionInfo().enableAutomaticPartition()
133+
&& olapTable.getPartitionInfo().getType() == PartitionType.RANGE)) {
134+
throw new UserException("Only AUTO RANGE PARTITION table could set "
135+
+ PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_COUNT);
136+
}
137+
127138
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS)) {
128139
long ttlSeconds = PropertyAnalyzer.analyzeTTL(properties);
129140
olapTable.readLock();
@@ -351,6 +362,9 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
351362
} finally {
352363
olapTable.writeUnlock();
353364
}
365+
366+
// after modifyTableProperties, buildPartitionRetentionCount has been done.
367+
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
354368
}
355369

356370
private static class UpdatePartitionMetaParam {

0 commit comments

Comments
 (0)