Skip to content

Commit b679698

Browse files
committed
[FEATURE] Add an rpc method to obtain the uniffleId and delete the Write Stage for retry at the same time.
1 parent 1225f72 commit b679698

File tree

35 files changed

+405
-1329
lines changed

35 files changed

+405
-1329
lines changed

client-spark/common/src/main/java/org/apache/ShuffleIdMappingManager.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,30 @@ public int getOrCreateUniffleShuffleId(int shuffleId) {
5858
public void recordShuffleIdDeterminate(int shuffleId, boolean isDeterminate) {
5959
shuffleDeterminateMap.put(shuffleId, isDeterminate);
6060
}
61+
62+
public boolean getShuffleIdDeterminate(int shuffleId) {
63+
return shuffleDeterminateMap.get(shuffleId);
64+
}
65+
66+
public boolean hasUniffleShuffleIdByStageNumber(int shuffleId, int stageAttemptNumber) {
67+
return shuffleIdMapping.get(shuffleId).get(stageAttemptNumber) != null;
68+
}
69+
70+
public int getUniffleShuffleIdByStageNumber(int shuffleId, int stageAttemptNumber) {
71+
return shuffleIdMapping.get(shuffleId).get(stageAttemptNumber);
72+
}
73+
74+
public int getUniffleShuffleIdByStageNumber(int shuffleId) {
75+
return shuffleIdMapping.get(shuffleId).entrySet().stream()
76+
.sorted(Map.Entry.<Integer, Integer>comparingByKey().reversed())
77+
.findFirst()
78+
.get()
79+
.getValue();
80+
}
81+
82+
public int createUniffleShuffleId(int shuffleId, int stageAttemptNumber) {
83+
int generatorShuffleId = shuffleIdGenerator.incrementAndGet();
84+
shuffleIdMapping.get(shuffleId).put(stageAttemptNumber, generatorShuffleId);
85+
return generatorShuffleId;
86+
}
6187
}

client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,22 +46,6 @@ public class RssSparkConfig {
4646
.withDeprecatedKeys(RssClientConfig.RSS_RESUBMIT_STAGE)
4747
.withDescription("Whether to enable the resubmit stage for fetch/write failure");
4848

49-
public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED =
50-
ConfigOptions.key("rss.stageRetry.fetchFailureEnabled")
51-
.booleanType()
52-
.defaultValue(false)
53-
.withFallbackKeys(RSS_RESUBMIT_STAGE_ENABLED.key(), RssClientConfig.RSS_RESUBMIT_STAGE)
54-
.withDescription(
55-
"If set to true, the stage retry mechanism will be enabled when a fetch failure occurs.");
56-
57-
public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED =
58-
ConfigOptions.key("rss.stageRetry.writeFailureEnabled")
59-
.booleanType()
60-
.defaultValue(false)
61-
.withFallbackKeys(RSS_RESUBMIT_STAGE_ENABLED.key(), RssClientConfig.RSS_RESUBMIT_STAGE)
62-
.withDescription(
63-
"If set to true, the stage retry mechanism will be enabled when a write failure occurs.");
64-
6549
public static final ConfigOption<Boolean> RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED =
6650
ConfigOptions.key("rss.blockId.selfManagementEnabled")
6751
.booleanType()

client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import org.apache.uniffle.common.exception.RssFetchFailedException;
5555
import org.apache.uniffle.common.util.Constants;
5656

57-
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
57+
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_ENABLED;
5858

5959
public class RssSparkShuffleUtils {
6060

@@ -369,7 +369,7 @@ public static RssException reportRssFetchFailedException(
369369
int stageAttemptId,
370370
Set<Integer> failedPartitions) {
371371
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
372-
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED)
372+
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_ENABLED)
373373
&& RssSparkShuffleUtils.isStageResubmitSupported()) {
374374
for (int partitionId : failedPartitions) {
375375
RssReportShuffleFetchFailureRequest req =

client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfoManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,26 @@
2121
import java.io.IOException;
2222
import java.util.Map;
2323

24-
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
24+
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
2525

2626
import org.apache.uniffle.common.util.JavaUtils;
2727

2828
public class ShuffleHandleInfoManager implements Closeable {
29-
private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
29+
private Map<Integer, MutableShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
3030

3131
public ShuffleHandleInfoManager() {
3232
this.shuffleIdToShuffleHandleInfo = JavaUtils.newConcurrentMap();
3333
}
3434

35-
public ShuffleHandleInfo get(int shuffleId) {
35+
public MutableShuffleHandleInfo get(int shuffleId) {
3636
return shuffleIdToShuffleHandleInfo.get(shuffleId);
3737
}
3838

3939
public void remove(int shuffleId) {
4040
shuffleIdToShuffleHandleInfo.remove(shuffleId);
4141
}
4242

43-
public void register(int shuffleId, ShuffleHandleInfo handle) {
43+
public void register(int shuffleId, MutableShuffleHandleInfo handle) {
4444
shuffleIdToShuffleHandleInfo.put(shuffleId, handle);
4545
}
4646

client-spark/common/src/main/java/org/apache/spark/shuffle/handle/StageAttemptShuffleHandleInfo.java

Lines changed: 0 additions & 144 deletions
This file was deleted.

client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdSelfManagedShuffleWriteClient.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -77,31 +77,6 @@ public void reportShuffleResult(
7777
managerClientSupplier.get().reportShuffleResult(request);
7878
}
7979

80-
@Override
81-
public void reportShuffleResult(
82-
Map<ShuffleServerInfo, Map<Integer, Set<Long>>> serverToPartitionToBlockIds,
83-
String appId,
84-
int shuffleId,
85-
long taskAttemptId,
86-
int bitmapNum,
87-
Set<ShuffleServerInfo> reportFailureServers,
88-
boolean enableWriteFailureRetry) {
89-
Map<Integer, List<Long>> partitionToBlockIds = new HashMap<>();
90-
for (Map<Integer, Set<Long>> k : serverToPartitionToBlockIds.values()) {
91-
for (Map.Entry<Integer, Set<Long>> entry : k.entrySet()) {
92-
int partitionId = entry.getKey();
93-
partitionToBlockIds
94-
.computeIfAbsent(partitionId, x -> new ArrayList<>())
95-
.addAll(entry.getValue());
96-
}
97-
}
98-
99-
RssReportShuffleResultRequest request =
100-
new RssReportShuffleResultRequest(
101-
appId, shuffleId, taskAttemptId, partitionToBlockIds, bitmapNum);
102-
managerClientSupplier.get().reportShuffleResult(request);
103-
}
104-
10580
@Override
10681
public Roaring64NavigableMap getShuffleResult(
10782
String clientType,

0 commit comments

Comments
 (0)