Skip to content

Commit 72d75aa

Browse files
committed
feat(sharding)_: listen to new community shards together with old ones
1 parent b203ef7 commit 72d75aa

File tree

7 files changed

+155
-70
lines changed

7 files changed

+155
-70
lines changed

messaging/api.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,8 @@ func (a *API) ConnectionChanged(state connection.State) {
234234
a.transport.ConnectionChanged(state)
235235
}
236236

237-
func (a *API) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
238-
return a.transport.SubscribeToPubsubTopic(topic, optPublicKey)
237+
func (a *API) SubscribeToPubsubTopic(topic string, optPublicKey ...*ecdsa.PublicKey) error {
238+
return a.transport.SubscribeToPubsubTopic(topic, optPublicKey...)
239239
}
240240

241241
func (a *API) UnsubscribeFromPubsubTopic(topic string) error {

messaging/transport/filters_manager.go

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"context"
66
"crypto/ecdsa"
77
"encoding/hex"
8+
"fmt"
9+
"strings"
810
"sync"
911

1012
"github.com/pkg/errors"
@@ -162,6 +164,8 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com
162164

163165
topics := make([]string, 0)
164166
topics = append(topics, wakuv2.DefaultNonProtectedPubsubTopic())
167+
topics = append(topics, wakuv2.GlobalCommunityControlPubsubTopic())
168+
topics = append(topics, wakuv2.GlobalCommunityContentPubsubTopic())
165169
topics = append(topics, communityFilter.Shard.PubsubTopic())
166170

167171
for _, pubsubTopic := range topics {
@@ -220,7 +224,16 @@ func (f *FiltersManager) Filters() (result []*Filter) {
220224
func (f *FiltersManager) Filter(chatID string) *Filter {
221225
f.mutex.Lock()
222226
defer f.mutex.Unlock()
223-
return f.filters[chatID]
227+
228+
// find the first filter that matches this chat ID
229+
// TODO this is temporary so not changing the return type, otherwise we should return a slice
230+
for key, filter := range f.filters {
231+
if strings.HasPrefix(key, chatID) {
232+
return filter
233+
}
234+
}
235+
236+
return nil
224237
}
225238

226239
// FilterByFilterID returns a Filter with a given Whisper filter ID.
@@ -276,19 +289,23 @@ func (f *FiltersManager) FilterByChatID(chatID string) *Filter {
276289
return f.filters[chatID]
277290
}
278291

279-
// Remove remove all the filters associated with a chat/identity
280-
func (f *FiltersManager) Remove(ctx context.Context, filters ...*Filter) error {
292+
// Remove removes all the filtersToRemove
293+
func (f *FiltersManager) Remove(ctx context.Context, filtersToRemove ...*Filter) error {
281294
f.mutex.Lock()
282295
defer f.mutex.Unlock()
283296

284-
for _, filter := range filters {
297+
for _, filter := range filtersToRemove {
285298
if err := f.service.Unsubscribe(ctx, filter.FilterID); err != nil {
286299
return err
287300
}
288301
if filter.SymKeyID != "" {
289302
f.service.DeleteSymKey(filter.SymKeyID)
290303
}
291-
delete(f.filters, filter.ChatID)
304+
for k, v := range f.filters {
305+
if filter.FilterID == v.FilterID {
306+
delete(f.filters, k)
307+
}
308+
}
292309
}
293310

294311
return nil
@@ -301,10 +318,10 @@ func (f *FiltersManager) RemoveNoListenFilters() error {
301318
var filterIDs []string
302319
var filters []*Filter
303320

304-
for _, f := range filters {
305-
if !f.Listen {
306-
filterIDs = append(filterIDs, f.FilterID)
307-
filters = append(filters, f)
321+
for _, v := range f.filters {
322+
if !v.Listen {
323+
filterIDs = append(filterIDs, v.FilterID)
324+
filters = append(filters, v)
308325
}
309326
}
310327
if err := f.service.UnsubscribeMany(filterIDs); err != nil {
@@ -315,30 +332,40 @@ func (f *FiltersManager) RemoveNoListenFilters() error {
315332
if filter.SymKeyID != "" {
316333
f.service.DeleteSymKey(filter.SymKeyID)
317334
}
318-
delete(f.filters, filter.ChatID)
335+
for k, v := range f.filters {
336+
if filter.FilterID == v.FilterID {
337+
delete(f.filters, k)
338+
}
339+
}
319340
}
320341

321342
return nil
322343
}
323344

324-
// Remove remove all the filters associated with a chat/identity
345+
// RemoveFilterByChatID removes the filters associated with a chat/identity
325346
func (f *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) {
326347
// TODO: remove subscriptions from waku2 if required. Might need to be implemented in transport
327348

349+
toRemove := make([]*Filter, 0)
328350
f.mutex.Lock()
329-
filter, ok := f.filters[chatID]
351+
for _, filter := range f.filters {
352+
if filter.ChatID == chatID {
353+
toRemove = append(toRemove, filter)
354+
}
355+
}
330356
f.mutex.Unlock()
331357

332-
if !ok {
358+
if len(toRemove) == 0 {
333359
return nil, nil
334360
}
335361

336-
err := f.Remove(context.Background(), filter)
362+
err := f.Remove(context.Background(), toRemove...)
337363
if err != nil {
338364
return nil, err
339365
}
340366

341-
return filter, nil
367+
// TODO temporary so not changing the return type, otherwise we should return a slice
368+
return toRemove[0], nil
342369
}
343370

344371
// LoadPartitioned creates a filter for a partitioned topic.
@@ -514,12 +541,14 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter {
514541
return f.filters[personalDiscoveryTopic]
515542
}
516543

517-
// LoadPublic adds a filter for a public chat.
544+
// LoadPublic adds a filter for a public chat with specific pubsubTopic
518545
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) {
519546
f.mutex.Lock()
520547
defer f.mutex.Unlock()
521548

522-
if chat, ok := f.filters[chatID]; ok {
549+
filterKey := toFilterKey(chatID, pubsubTopic)
550+
551+
if chat, ok := f.filters[filterKey]; ok {
523552
if chat.PubsubTopic != pubsubTopic {
524553
f.logger.Debug("updating pubsub topic for filter",
525554
zap.String("chatID", chatID),
@@ -528,9 +557,8 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
528557
zap.String("newTopic", pubsubTopic),
529558
)
530559
chat.PubsubTopic = pubsubTopic
531-
f.filters[chatID] = chat
560+
f.filters[filterKey] = chat
532561
}
533-
534562
return chat, nil
535563
}
536564

@@ -550,7 +578,7 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
550578
OneToOne: false,
551579
}
552580

553-
f.filters[chatID] = chat
581+
f.filters[filterKey] = chat
554582

555583
f.logger.Debug("registering filter for",
556584
zap.String("chatID", chatID),
@@ -681,3 +709,13 @@ func (f *FiltersManager) GetNegotiated(identity *ecdsa.PublicKey) *Filter {
681709

682710
return f.filters[NegotiatedTopic(identity)]
683711
}
712+
713+
// toFilterKey creates a unique key for filters map using chatID and pubsubTopic
714+
//
715+
// to allow one chat to have multiple filters in different pubsubTopics so that we can migrate the communities to 128 and 256 shards
716+
func toFilterKey(chatID string, pubsubTopic string) string {
717+
if pubsubTopic == "" {
718+
return chatID
719+
}
720+
return fmt.Sprintf("%s::%s", chatID, pubsubTopic)
721+
}

messaging/transport/transport.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/ecdsa"
66
"database/sql"
7+
"fmt"
78
"sync"
89
"time"
910

@@ -515,8 +516,25 @@ func (t *Transport) ConnectionChanged(state connection.State) {
515516
}
516517

517518
// Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected
518-
func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
519-
return t.waku.SubscribeToPubsubTopic(topic, optPublicKey)
519+
func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKeys ...*ecdsa.PublicKey) error {
520+
if len(optPublicKeys) > 1 {
521+
return fmt.Errorf("at most one public key can be passed, pub keys: %v", len(optPublicKeys))
522+
}
523+
var pubKey *ecdsa.PublicKey
524+
if len(optPublicKeys) == 1 {
525+
pubKey = optPublicKeys[0]
526+
} else {
527+
// try to retrieve pubkey for pubsubtopic if none provided
528+
privK, err := t.RetrievePubsubTopicKey(topic)
529+
if err != nil {
530+
return err
531+
}
532+
if privK != nil {
533+
pubKey = &privK.PublicKey
534+
}
535+
}
536+
t.logger.Debug("subscribing to protected pubsub topic", zap.String("pubsubtopic", topic), zap.Any("pubkey", pubKey))
537+
return t.waku.SubscribeToPubsubTopic(topic, pubKey)
520538
}
521539

522540
// Unsubscribe from a pubsub topic

protocol/messenger_communities.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,20 +1045,7 @@ func (m *Messenger) JoinCommunity(ctx context.Context, communityID types.HexByte
10451045
}
10461046

10471047
func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *wakuv2.Shard) error {
1048-
// TODO: this should probably be moved completely to transport once pubsub topic logic is implemented
1049-
pubsubTopic := shard.PubsubTopic()
1050-
1051-
privK, err := m.messaging.RetrievePubsubTopicKey(pubsubTopic)
1052-
if err != nil {
1053-
return err
1054-
}
1055-
1056-
var pubK *ecdsa.PublicKey
1057-
if privK != nil {
1058-
pubK = &privK.PublicKey
1059-
}
1060-
1061-
return m.messaging.SubscribeToPubsubTopic(pubsubTopic, pubK)
1048+
return m.messaging.SubscribeToPubsubTopic(shard.PubsubTopic())
10621049
}
10631050

10641051
func (m *Messenger) unsubscribeFromShard(shard *wakuv2.Shard) error {
@@ -2498,6 +2485,13 @@ func (m *Messenger) DefaultFilters(o *communities.Community) messaging.ChatsToIn
24982485
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
24992486
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic},
25002487
{ChatID: uncompressedPubKey, PubsubTopic: wakuv2.DefaultNonProtectedPubsubTopic()},
2488+
2489+
// While we migrate to 128 and 256 shards, we listen to community messages in the new shards as well as in the old ones. After migration we should remove the old ones
2490+
{ChatID: cID, PubsubTopic: wakuv2.GlobalCommunityControlPubsubTopic()},
2491+
{ChatID: updatesChannelID, PubsubTopic: wakuv2.GlobalCommunityControlPubsubTopic()},
2492+
{ChatID: mlChannelID, PubsubTopic: wakuv2.GlobalCommunityControlPubsubTopic()},
2493+
{ChatID: memberUpdateChannelID, PubsubTopic: wakuv2.GlobalCommunityContentPubsubTopic()}, // Making content since chat messages are sent in this contenttopic
2494+
{ChatID: uncompressedPubKey, PubsubTopic: wakuv2.GlobalCommunityContentPubsubTopic()},
25012495
}
25022496

25032497
return chats

protocol/messenger_filter_init.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,16 @@ func (m *Messenger) InitFilters() error {
2424
rand.Seed(time.Now().Unix())
2525

2626
// Community requests will arrive in this pubsub topic
27-
if err := m.SubscribeToPubsubTopic(wakuv2.DefaultNonProtectedPubsubTopic(), nil); err != nil {
27+
// TODO depracate
28+
if err := m.SubscribeToPubsubTopic(wakuv2.DefaultNonProtectedPubsubTopic()); err != nil {
29+
return err
30+
}
31+
// TODO only subscribe if interested in communities
32+
if err := m.SubscribeToPubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()); err != nil {
33+
return err
34+
}
35+
// TODO only subscribe if interested in communities
36+
if err := m.SubscribeToPubsubTopic(wakuv2.GlobalCommunityContentPubsubTopic()); err != nil {
2837
return err
2938
}
3039

protocol/messenger_peers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ func (m *Messenger) ENR() (*enode.Node, error) {
4444
}
4545

4646
// Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected
47-
func (m *Messenger) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
48-
return m.messaging.SubscribeToPubsubTopic(topic, optPublicKey)
47+
func (m *Messenger) SubscribeToPubsubTopic(topic string, optPublicKey ...*ecdsa.PublicKey) error {
48+
return m.messaging.SubscribeToPubsubTopic(topic, optPublicKey...)
4949
}
5050

5151
func (m *Messenger) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {

protocol/messenger_store_node_request_manager.go

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -86,46 +86,72 @@ func (m *StoreNodeRequestManager) FetchCommunity(ctx context.Context, community
8686
zap.Any("community", community),
8787
zap.Any("config", cfg))
8888

89-
requestCommunity := func(communityID string, shard *wakuv2.Shard) (*communities.Community, StoreNodeRequestStats, error) {
90-
channel, err := m.subscribeToRequest(ctx, storeNodeCommunityRequest, communityID, shard, cfg)
91-
if err != nil {
92-
return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a request for community: %w", err)
89+
fetch := func() (*communities.Community, StoreNodeRequestStats, error) {
90+
// if shard was not passed or nil, see if this community has a custom shard by requesting for this config to the store node
91+
communityCustomShard := community.Shard
92+
if communityCustomShard == nil {
93+
var err error
94+
communityCustomShard, err = m.fetchCustomShard(ctx, community.CommunityID, cfg, wakuv2.DefaultNonProtectedShard(), wakuv2.GlobalCommunityControlShard())
95+
if err != nil {
96+
return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to fetch community shard: %w", err)
97+
}
9398
}
94-
95-
if !cfg.WaitForResponse {
96-
return nil, StoreNodeRequestStats{}, nil
99+
if communityCustomShard != nil {
100+
return m.fetchCommunity(ctx, community.CommunityID, cfg, communityCustomShard)
97101
}
102+
// request community in 32 and 64 shards
103+
return m.fetchCommunity(ctx, community.CommunityID, cfg, wakuv2.DefaultShard(), wakuv2.GlobalCommunityControlShard())
104+
}
98105

99-
result := <-channel
100-
return result.community, result.stats, result.err
106+
if !cfg.WaitForResponse {
107+
go fetch()
108+
return nil, StoreNodeRequestStats{}, nil
101109
}
110+
return fetch()
111+
}
102112

103-
// if shard was not passed or nil, request shard first
104-
communityShard := community.Shard
105-
if communityShard == nil {
106-
id := messaging.CommunityShardInfoTopic(community.CommunityID)
107-
fetchedShard, err := m.subscribeToRequest(ctx, storeNodeShardRequest, id, wakuv2.DefaultNonProtectedShard(), cfg)
113+
// fetchCommunity fetches a community from the store node.
114+
//
115+
// Since now we could have this community description in shards 32 and 128 we are doing the following:
116+
// 1. trying to fetch in shards 32 and 128
117+
// 2. returning the first non-nil community result
118+
//
119+
// Eventually we should just go to shard 128, once full migration to Global Community Control and Content Topic is done.
120+
func (m *StoreNodeRequestManager) fetchCommunity(ctx context.Context, communityID string, cfg StoreNodeRequestConfig, shards ...*wakuv2.Shard) (*communities.Community, StoreNodeRequestStats, error) {
121+
for _, shard := range shards {
122+
sub, err := m.subscribeToRequest(ctx, storeNodeCommunityRequest, communityID, shard, cfg)
108123
if err != nil {
109124
return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a shard info request: %w", err)
110125
}
111-
112-
if !cfg.WaitForResponse {
113-
go func() {
114-
defer gocommon.LogOnPanic()
115-
shardResult := <-fetchedShard
116-
communityShard = shardResult.shard
117-
118-
_, _, _ = requestCommunity(community.CommunityID, communityShard)
119-
}()
120-
return nil, StoreNodeRequestStats{}, nil
126+
// not doing this concurrently to avoid two filters for the same communityID, not a big deal since it is at most 2 shards and temporary
127+
res := <-sub
128+
if res.community != nil {
129+
return res.community, res.stats, res.err
121130
}
122-
123-
shardResult := <-fetchedShard
124-
communityShard = shardResult.shard
125131
}
132+
return nil, StoreNodeRequestStats{}, nil
133+
}
126134

127-
// request community with on shard
128-
return requestCommunity(community.CommunityID, communityShard)
135+
// fetchCustomShard fetches a community custom shard from the store node.
136+
//
137+
// Since now we could have communities' shard info in shards 64 and 128 we are doing the following:
138+
// 1. trying to fetch in shards 64 and 128
139+
// 2. returning the first non-nil shard result
140+
//
141+
// Eventually we should just go to shard 128, once full migration to Global Community Control and Content Topic is done.
142+
func (m *StoreNodeRequestManager) fetchCustomShard(ctx context.Context, communityID string, cfg StoreNodeRequestConfig, shards ...*wakuv2.Shard) (*wakuv2.Shard, error) {
143+
for _, shard := range shards {
144+
sub, err := m.subscribeToRequest(ctx, storeNodeCommunityRequest, messaging.CommunityShardInfoTopic(communityID), shard, cfg)
145+
if err != nil {
146+
return nil, fmt.Errorf("failed to create a shard info request: %w", err)
147+
}
148+
// not doing this concurrently to avoid two filters for the same CommunityShardInfoTopic, not a big deal since it is at most 2 shards and temporary
149+
res := <-sub
150+
if res.community != nil {
151+
return shard, nil
152+
}
153+
}
154+
return nil, nil
129155
}
130156

131157
// FetchCommunities makes a FetchCommunity for each element in given `communities` list.

0 commit comments

Comments
 (0)