- 
                Notifications
    You must be signed in to change notification settings 
- Fork 143
Do not throw recursive update exception when producer state recovery failed #1982
base: master
Are you sure you want to change the base?
Do not throw recursive update exception when producer state recovery failed #1982
Conversation
…failed
### Motivation
When transaction is enabled, `PartitionLog#initialise` will recover the
state from the local snapshot. It's an asynchronous operation that could
fail. In this case, an "recursive update" `IllegalStateException` will
be thrown, which is unexpected.
```
Suppressed: java.lang.IllegalStateException: Recursive update
    at java.util.concurrent.ConcurrentHashMap.replaceNode(ConcurrentHashMap.java:1167) ~[?:?]
    at java.util.concurrent.ConcurrentHashMap.remove(ConcurrentHashMap.java:1552) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.storage.PartitionLogManager.lambda$getLog$0(PartitionLogManager.java:88) ~[?:?]
```
The reason is that in `PartitionLogManager#getLog`, `logMap.remove` is
called in the callback of `whenComplete`, which could be called in the
same thread. Then the `remove` method is just called in the 2nd argument
of `computeIfAbsent`.
https://github.com/streamnative/kop/blob/3602c9e826d903d97091af1cc608b9d88c1b8cf3/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java#L88
### Modifications
Store the future of `PartitionLog` in `PartitionLogManager`, move the
`remove` call out of the `computeIfAbsent` in the `exceptionally`
callback of `ReplicaManager#getPartitionLog`.
    | @BewareMyPower:Thanks for your contribution. For this PR, do we need to update docs? | 
    
      
        1 similar comment
      
    
  
    | @BewareMyPower:Thanks for your contribution. For this PR, do we need to update docs? | 
| Codecov Report
 @@             Coverage Diff              @@
##             master    #1982      +/-   ##
============================================
+ Coverage     17.72%   17.75%   +0.03%     
- Complexity      751      752       +1     
============================================
  Files           195      195              
  Lines         14156    14146      -10     
  Branches       1322     1319       -3     
============================================
+ Hits           2509     2512       +3     
+ Misses        11464    11452      -12     
+ Partials        183      182       -1     
 | 
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } catch (ExecutionException e) { | ||
| log.error("Failed to get PartitionLog for {} under {}", topicPartition, namespacePrefix, e.getCause()); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to remove the failed future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be removed in the exceptionally callback of the future returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR has a serious problem so that it's draft now.
Motivation
When transaction is enabled,
PartitionLog#initialisewill recover the state from the local snapshot. It's an asynchronous operation that could fail. In this case, an "recursive update"IllegalStateExceptionwill be thrown, which is unexpected.The reason is that in
PartitionLogManager#getLog,logMap.removeis called in the callback ofwhenComplete, which could be called in the same thread. Then theremovemethod is just called in the 2nd argument ofcomputeIfAbsent.kop/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java
Line 88 in 3602c9e
Modifications
Store the future of
PartitionLoginPartitionLogManager, move theremovecall out of thecomputeIfAbsentin theexceptionallycallback ofReplicaManager#getPartitionLog.