Skip to content

Commit d034f64

Browse files
[FIXED] 'cluster_traffic: owner' is forgotten after restart (#7191)
`cluster_traffic: owner` can be used to have replication traffic go over the account "owning" the stream, versus this traffic going over the system account (`cluster_traffic: system`/default). When pushing an updated JWT to all servers, all these servers would correctly update their `cluster_traffic` setting. However, if a server was restarted it would "forget" `cluster_traffic: owner` was set, and revert back to `cluster_traffic: system`. This would put this single server to be unable to communicate with the remainder of the cluster. Restarting the rest of the cluster would have them also revert back to `cluster_traffic: system`, which would allow them to communicate again. But, without respecting the `cluster_traffic: owner` setting on the account. This PR fixes that by ensuring `cluster_traffic` can be updated at the same time as that JetStream is enabled for that particular account upon startup. Due to this issue, if a server with this fix is deployed, it will not be able to communicate with the other servers that had reverted back to `cluster_traffic: system`. A clean upgrade path for this would be: - Temporarily update the account to have `cluster_traffic: system`. If all servers were restarted they were already on this setting. Any servers that were not yet restarted will now agree on this setting. - Upgrade all servers to the new server version with this fix. - Update the account to have `cluster_traffic: owner` again. It should now be remembered even after a server restart. Signed-off-by: Maurice van Veen <[email protected]>
2 parents 149a096 + 6005a5f commit d034f64

File tree

2 files changed

+90
-0
lines changed

2 files changed

+90
-0
lines changed

server/accounts.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3784,6 +3784,11 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
37843784
// Absent reload of js server cfg, this is going to be broken until js is disabled
37853785
a.incomplete = true
37863786
a.mu.Unlock()
3787+
} else {
3788+
a.mu.Lock()
3789+
// Refresh reference, we've just enabled JetStream, so it would have been nil before.
3790+
ajs = a.js
3791+
a.mu.Unlock()
37873792
}
37883793
} else if a.jsLimits != nil {
37893794
// We do not have JS enabled for this server, but the account has it enabled so setup

server/jetstream_jwt_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,6 +1821,91 @@ func TestJetStreamJWTClusterAccountNRG(t *testing.T) {
18211821
}
18221822
}
18231823

1824+
func TestJetStreamJWTClusterAccountNRGPersistsAfterRestart(t *testing.T) {
1825+
_, syspub := createKey(t)
1826+
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
1827+
1828+
aExpKp, aExpPub := createKey(t)
1829+
accClaim := jwt.NewAccountClaims(aExpPub)
1830+
accClaim.Name = "acc"
1831+
accClaim.ClusterTraffic = jwt.ClusterTrafficOwner
1832+
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 10, Streams: 1}
1833+
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 1, Streams: 1}
1834+
accJwt := encodeClaim(t, accClaim, aExpPub)
1835+
accCreds := newUser(t, aExpKp)
1836+
1837+
tmlp := `
1838+
listen: 127.0.0.1:-1
1839+
server_name: %s
1840+
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
1841+
leaf {
1842+
listen: 127.0.0.1:-1
1843+
}
1844+
cluster {
1845+
name: %s
1846+
listen: 127.0.0.1:%d
1847+
routes = [%s]
1848+
}
1849+
` + fmt.Sprintf(`
1850+
operator: %s
1851+
system_account: %s
1852+
resolver = MEMORY
1853+
resolver_preload = {
1854+
%s : %s
1855+
%s : %s
1856+
}
1857+
`, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt)
1858+
1859+
c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3)
1860+
defer c.shutdown()
1861+
1862+
nc, _ := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds))
1863+
_, err := jsStreamCreate(t, nc, &StreamConfig{
1864+
Name: "TEST",
1865+
Replicas: 3,
1866+
Storage: FileStorage,
1867+
})
1868+
require_NoError(t, err)
1869+
1870+
// The account had cluster traffic set to "owner" already. Restarting servers should remember this setting.
1871+
for _, s := range c.servers {
1872+
acc, err := s.lookupAccount(aExpPub)
1873+
require_NoError(t, err)
1874+
1875+
// Check that everything looks like it should.
1876+
require_True(t, acc != nil)
1877+
require_True(t, acc.js != nil)
1878+
require_Equal(t, acc.nrgAccount, aExpPub)
1879+
1880+
// Now get a list of all the Raft nodes that should have the correct cluster traffic set.
1881+
s.rnMu.Lock()
1882+
raftNodes := make([]*raft, 0, len(s.raftNodes))
1883+
for _, n := range s.raftNodes {
1884+
rg := n.(*raft)
1885+
if rg.accName != acc.Name {
1886+
continue
1887+
}
1888+
raftNodes = append(raftNodes, rg)
1889+
}
1890+
s.rnMu.Unlock()
1891+
1892+
// Get the Raftz state also.
1893+
rz := s.Raftz(&RaftzOptions{AccountFilter: aExpPub})
1894+
require_NotNil(t, rz)
1895+
rza := (*rz)[aExpPub]
1896+
require_NotNil(t, rza)
1897+
1898+
for _, rg := range raftNodes {
1899+
rg.Lock()
1900+
rgAcc := rg.acc
1901+
rg.Unlock()
1902+
require_Equal(t, rgAcc.Name, aExpPub)
1903+
require_Equal(t, rza[rg.group].SystemAcc, false)
1904+
require_Equal(t, rza[rg.group].TrafficAcc, aExpPub)
1905+
}
1906+
}
1907+
}
1908+
18241909
func TestJetStreamJWTUpdateWithPreExistingStream(t *testing.T) {
18251910
updateJwt := func(url string, creds string, pubKey string, jwt string) {
18261911
t.Helper()

0 commit comments

Comments
 (0)