-
Notifications
You must be signed in to change notification settings - Fork 264
feat: communities-in-own-shard-1 #6573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b203ef7
72d75aa
37ef179
9db6cb8
6634a74
5efe5cb
1db5780
4f3eade
787a7d4
ba90867
a553d1a
023cabc
85170ab
f927996
4744db8
d2bf53e
1748e23
2b37bca
c51b549
3b70d68
a7e7f24
8bd2b03
a0af8df
4bf6350
ebc5516
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,8 @@ import ( | |
| "context" | ||
| "crypto/ecdsa" | ||
| "encoding/hex" | ||
| "fmt" | ||
| "strings" | ||
| "sync" | ||
|
|
||
| "github.com/pkg/errors" | ||
|
|
@@ -101,7 +103,7 @@ func (f *FiltersManager) Init( | |
|
|
||
| // Add public, one-to-one and negotiated filters. | ||
| for _, fi := range filtersToInit { | ||
| _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic) | ||
| _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, false) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
@@ -127,13 +129,15 @@ func (f *FiltersManager) Init( | |
| type FiltersToInitialize struct { | ||
| ChatID string | ||
| PubsubTopic string | ||
| // TODO (#6384) temporary flag while migrating community shards | ||
| DistinctByPubsub bool | ||
| } | ||
|
|
||
| func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) { | ||
| var filters []*Filter | ||
| // Add public, one-to-one and negotiated filters. | ||
| for _, pf := range publicFiltersToInit { | ||
| f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic) | ||
| f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.DistinctByPubsub) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
@@ -161,6 +165,8 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com | |
|
|
||
| topics := make([]string, 0) | ||
| topics = append(topics, wakuv2.DefaultNonProtectedPubsubTopic()) | ||
| topics = append(topics, wakuv2.GlobalCommunityControlPubsubTopic()) | ||
| topics = append(topics, wakuv2.GlobalCommunityContentPubsubTopic()) | ||
| topics = append(topics, communityFilter.Shard.PubsubTopic()) | ||
|
|
||
| for _, pubsubTopic := range topics { | ||
|
|
@@ -219,7 +225,16 @@ func (f *FiltersManager) Filters() (result []*Filter) { | |
| func (f *FiltersManager) Filter(chatID string) *Filter { | ||
| f.mutex.Lock() | ||
| defer f.mutex.Unlock() | ||
| return f.filters[chatID] | ||
|
|
||
| // find the first filter that matches this chat ID | ||
| // TODO this is temporary so not changing the return type, otherwise we should return a slice | ||
| for key, filter := range f.filters { | ||
| if strings.HasPrefix(key, chatID) { | ||
| return filter | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // FilterByFilterID returns a Filter with a given Whisper filter ID. | ||
|
|
@@ -275,19 +290,23 @@ func (f *FiltersManager) FilterByChatID(chatID string) *Filter { | |
| return f.filters[chatID] | ||
| } | ||
|
|
||
| // Remove remove all the filters associated with a chat/identity | ||
| func (f *FiltersManager) Remove(ctx context.Context, filters ...*Filter) error { | ||
| // Remove removes all the filtersToRemove | ||
| func (f *FiltersManager) Remove(ctx context.Context, filtersToRemove ...*Filter) error { | ||
| f.mutex.Lock() | ||
| defer f.mutex.Unlock() | ||
|
|
||
| for _, filter := range filters { | ||
| for _, filter := range filtersToRemove { | ||
| if err := f.service.Unsubscribe(ctx, filter.FilterID); err != nil { | ||
| return err | ||
| } | ||
| if filter.SymKeyID != "" { | ||
| f.service.DeleteSymKey(filter.SymKeyID) | ||
| } | ||
| delete(f.filters, filter.ChatID) | ||
| for k, v := range f.filters { | ||
| if filter.FilterID == v.FilterID { | ||
| delete(f.filters, k) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
|
|
@@ -300,10 +319,10 @@ func (f *FiltersManager) RemoveNoListenFilters() error { | |
| var filterIDs []string | ||
| var filters []*Filter | ||
|
|
||
| for _, f := range filters { | ||
| if !f.Listen { | ||
| filterIDs = append(filterIDs, f.FilterID) | ||
| filters = append(filters, f) | ||
| for _, v := range f.filters { | ||
| if !v.Listen { | ||
| filterIDs = append(filterIDs, v.FilterID) | ||
| filters = append(filters, v) | ||
| } | ||
| } | ||
| if err := f.service.UnsubscribeMany(filterIDs); err != nil { | ||
|
|
@@ -314,30 +333,40 @@ func (f *FiltersManager) RemoveNoListenFilters() error { | |
| if filter.SymKeyID != "" { | ||
| f.service.DeleteSymKey(filter.SymKeyID) | ||
| } | ||
| delete(f.filters, filter.ChatID) | ||
| for k, v := range f.filters { | ||
| if filter.FilterID == v.FilterID { | ||
| delete(f.filters, k) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Remove remove all the filters associated with a chat/identity | ||
| // RemoveFilterByChatID removes the filters associated with a chat/identity | ||
| func (f *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) { | ||
| // TODO: remove subscriptions from waku2 if required. Might need to be implemented in transport | ||
|
|
||
| toRemove := make([]*Filter, 0) | ||
| f.mutex.Lock() | ||
| filter, ok := f.filters[chatID] | ||
| for _, filter := range f.filters { | ||
| if filter.ChatID == chatID { | ||
| toRemove = append(toRemove, filter) | ||
| } | ||
| } | ||
| f.mutex.Unlock() | ||
|
|
||
| if !ok { | ||
| if len(toRemove) == 0 { | ||
| return nil, nil | ||
| } | ||
|
|
||
| err := f.Remove(context.Background(), filter) | ||
| err := f.Remove(context.Background(), toRemove...) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return filter, nil | ||
| // TODO temporary so not changing the return type, otherwise we should return a slice | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This (and most of the changes here) will be reverted once community's pubsub topics are migrated, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, the thing now is that we have to start "listening" in these pubsubs then when we start publishing in those these nodes will not miss messages |
||
| return toRemove[0], nil | ||
| } | ||
|
|
||
| // LoadPartitioned creates a filter for a partitioned topic. | ||
|
|
@@ -513,12 +542,17 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter { | |
| return f.filters[personalDiscoveryTopic] | ||
| } | ||
|
|
||
| // LoadPublic adds a filter for a public chat. | ||
| func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) { | ||
| // LoadPublic adds a filter for a public chat with specific pubsubTopic | ||
| func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, distinctByPubsub bool) (*Filter, error) { | ||
| f.mutex.Lock() | ||
| defer f.mutex.Unlock() | ||
|
|
||
| if chat, ok := f.filters[chatID]; ok { | ||
| filterKey := chatID | ||
| if distinctByPubsub { | ||
| filterKey = concatFilterKey(chatID, pubsubTopic) | ||
| } | ||
|
|
||
| if chat, ok := f.filters[filterKey]; ok { | ||
| if chat.PubsubTopic != pubsubTopic { | ||
| f.logger.Debug("updating pubsub topic for filter", | ||
| zap.String("chatID", chatID), | ||
|
|
@@ -527,9 +561,8 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, | |
| zap.String("newTopic", pubsubTopic), | ||
| ) | ||
| chat.PubsubTopic = pubsubTopic | ||
| f.filters[chatID] = chat | ||
| f.filters[filterKey] = chat | ||
| } | ||
|
|
||
| return chat, nil | ||
| } | ||
|
|
||
|
|
@@ -549,10 +582,12 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, | |
| OneToOne: false, | ||
| } | ||
|
|
||
| f.filters[chatID] = chat | ||
| f.filters[filterKey] = chat | ||
|
|
||
| f.logger.Debug("registering filter for", | ||
| zap.String("chatID", chatID), | ||
| zap.String("filterKey", filterKey), | ||
| zap.Bool("distinctByPubsub", distinctByPubsub), | ||
| zap.String("type", "public"), | ||
| zap.String("ContentTopic", filterAndTopic.Topic.String()), | ||
| zap.String("PubsubTopic", pubsubTopic), | ||
|
|
@@ -680,3 +715,13 @@ func (f *FiltersManager) GetNegotiated(identity *ecdsa.PublicKey) *Filter { | |
|
|
||
| return f.filters[NegotiatedTopic(identity)] | ||
| } | ||
|
|
||
| // toCommunityFilterKey creates a unique key for filters map using chatID and pubsubTopic | ||
| // | ||
| // to allow one chat to have multiple filters in different pubsubTopics so that we can migrate the communities to 128 and 256 shards | ||
| func concatFilterKey(chatID string, pubsubTopic string) string { | ||
| if pubsubTopic == "" { | ||
| return chatID | ||
| } | ||
| return fmt.Sprintf("%s::%s", chatID, pubsubTopic) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,13 +55,49 @@ func DefaultShardPubsubTopic() string { | |
| return wakuproto.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String() | ||
| } | ||
|
|
||
| func DefaultShard() *Shard { | ||
| return &Shard{ | ||
| Cluster: MainStatusShardCluster, | ||
| Index: DefaultShardIndex, | ||
| } | ||
| } | ||
|
|
||
| func DefaultNonProtectedShard() *Shard { | ||
| return &Shard{ | ||
| Cluster: MainStatusShardCluster, | ||
| Index: NonProtectedShardIndex, | ||
| } | ||
| } | ||
|
|
||
| // TODO this is used only for community control messages, we need to stop using it once migration is done | ||
| func DefaultNonProtectedPubsubTopic() string { | ||
| return DefaultNonProtectedShard().PubsubTopic() | ||
| } | ||
|
|
||
| // GlobalCommunityControlShard returns the shard for the global community control messages | ||
| // | ||
| // Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L329 | ||
| func GlobalCommunityControlShard() *Shard { | ||
| return &Shard{ | ||
| Cluster: MainStatusShardCluster, | ||
| Index: 128, | ||
| } | ||
| } | ||
|
|
||
| // GlobalCommunityContentShard returns the shard for the global community content messages | ||
| // | ||
| // Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L330 | ||
| func GlobalCommunityContentShard() *Shard { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering why it's duplicated with following There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found some packages are accessing one and some others the other |
||
| return &Shard{ | ||
| Cluster: MainStatusShardCluster, | ||
| Index: 256, | ||
| } | ||
| } | ||
|
|
||
| func GlobalCommunityControlPubsubTopic() string { | ||
| return GlobalCommunityControlShard().PubsubTopic() | ||
| } | ||
|
|
||
| func GlobalCommunityContentPubsubTopic() string { | ||
| return GlobalCommunityContentShard().PubsubTopic() | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.