Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b203ef7
feat(sharding)_: add shards config
plopezlpz May 11, 2025
72d75aa
feat(sharding)_: listen to new community shards together with old ones
plopezlpz May 11, 2025
37ef179
feat(sharding)_: linter
plopezlpz May 12, 2025
9db6cb8
feat(sharding)_: add chat specific filters
plopezlpz May 12, 2025
6634a74
feat(sharding)_: missing filters
plopezlpz May 12, 2025
5efe5cb
chore_: Merge branch 'develop' into feat/communities-in-own-shard-1
plopezlpz Jun 3, 2025
1db5780
fix_: fix filters
plopezlpz Jun 3, 2025
4f3eade
chore_: merge branch 'develop' into feat/communities-in-own-shard-1
plopezlpz Aug 19, 2025
787a7d4
chore_: merge conflicts
plopezlpz Aug 19, 2025
ba90867
chore_: use right type
plopezlpz Aug 19, 2025
a553d1a
fix: community store node request
plopezlpz Aug 20, 2025
023cabc
chore: merge develop into feat/communities-in-own-shard-1
plopezlpz Aug 20, 2025
85170ab
fix: lint
plopezlpz Aug 20, 2025
f927996
fix: lint
plopezlpz Aug 20, 2025
4744db8
chore: merge remote-tracking branch origin/develop into feat/communit…
plopezlpz Aug 20, 2025
d2bf53e
chore: merge branch develop into feat/communities-in-own-shard-1
plopezlpz Aug 22, 2025
1748e23
fix: overriding filters
plopezlpz Aug 22, 2025
2b37bca
Merge remote-tracking branch 'origin/develop' into feat/communities-i…
plopezlpz Sep 9, 2025
c51b549
fix: typo
plopezlpz Sep 9, 2025
3b70d68
fix: naming
plopezlpz Sep 14, 2025
a7e7f24
fix: naming
plopezlpz Sep 14, 2025
8bd2b03
fix: naming
plopezlpz Sep 14, 2025
a0af8df
Merge branch 'develop' into feat/communities-in-own-shard-1
plopezlpz Sep 14, 2025
4bf6350
fix: pr comments
plopezlpz Sep 14, 2025
ebc5516
fix: naming
plopezlpz Sep 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions messaging/adapters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ func ChatsToInitializeToTransport(c types.ChatsToInitialize) []transport.Filters
filters[i] = transport.FiltersToInitialize{
ChatID: chat.ChatID,
PubsubTopic: chat.PubsubTopic,
// TODO (#6384) temporary flag while migrating community shards
DistinctByPubsub: chat.IsCommunity,
}
}
return filters
Expand Down
91 changes: 68 additions & 23 deletions messaging/layers/transport/filters_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"strings"
"sync"

"github.com/pkg/errors"
Expand Down Expand Up @@ -101,7 +103,7 @@ func (f *FiltersManager) Init(

// Add public, one-to-one and negotiated filters.
for _, fi := range filtersToInit {
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic)
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, false)
if err != nil {
return nil, err
}
Expand All @@ -127,13 +129,15 @@ func (f *FiltersManager) Init(
type FiltersToInitialize struct {
ChatID string
PubsubTopic string
// TODO (#6384) temporary flag while migrating community shards
DistinctByPubsub bool
}

func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) {
var filters []*Filter
// Add public, one-to-one and negotiated filters.
for _, pf := range publicFiltersToInit {
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic)
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.DistinctByPubsub)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -161,6 +165,8 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com

topics := make([]string, 0)
topics = append(topics, wakuv2.DefaultNonProtectedPubsubTopic())
topics = append(topics, wakuv2.GlobalCommunityControlPubsubTopic())
topics = append(topics, wakuv2.GlobalCommunityContentPubsubTopic())
topics = append(topics, communityFilter.Shard.PubsubTopic())

for _, pubsubTopic := range topics {
Expand Down Expand Up @@ -219,7 +225,16 @@ func (f *FiltersManager) Filters() (result []*Filter) {
func (f *FiltersManager) Filter(chatID string) *Filter {
f.mutex.Lock()
defer f.mutex.Unlock()
return f.filters[chatID]

// find the first filter that matches this chat ID
// TODO this is temporary so not changing the return type, otherwise we should return a slice
for key, filter := range f.filters {
if strings.HasPrefix(key, chatID) {
return filter
}
}

return nil
}

// FilterByFilterID returns a Filter with a given Whisper filter ID.
Expand Down Expand Up @@ -275,19 +290,23 @@ func (f *FiltersManager) FilterByChatID(chatID string) *Filter {
return f.filters[chatID]
}

// Remove remove all the filters associated with a chat/identity
func (f *FiltersManager) Remove(ctx context.Context, filters ...*Filter) error {
// Remove removes all the filtersToRemove
func (f *FiltersManager) Remove(ctx context.Context, filtersToRemove ...*Filter) error {
f.mutex.Lock()
defer f.mutex.Unlock()

for _, filter := range filters {
for _, filter := range filtersToRemove {
if err := f.service.Unsubscribe(ctx, filter.FilterID); err != nil {
return err
}
if filter.SymKeyID != "" {
f.service.DeleteSymKey(filter.SymKeyID)
}
delete(f.filters, filter.ChatID)
for k, v := range f.filters {
if filter.FilterID == v.FilterID {
delete(f.filters, k)
}
}
}

return nil
Expand All @@ -300,10 +319,10 @@ func (f *FiltersManager) RemoveNoListenFilters() error {
var filterIDs []string
var filters []*Filter

for _, f := range filters {
if !f.Listen {
filterIDs = append(filterIDs, f.FilterID)
filters = append(filters, f)
for _, v := range f.filters {
if !v.Listen {
filterIDs = append(filterIDs, v.FilterID)
filters = append(filters, v)
}
}
if err := f.service.UnsubscribeMany(filterIDs); err != nil {
Expand All @@ -314,30 +333,40 @@ func (f *FiltersManager) RemoveNoListenFilters() error {
if filter.SymKeyID != "" {
f.service.DeleteSymKey(filter.SymKeyID)
}
delete(f.filters, filter.ChatID)
for k, v := range f.filters {
if filter.FilterID == v.FilterID {
delete(f.filters, k)
}
}
}

return nil
}

// Remove remove all the filters associated with a chat/identity
// RemoveFilterByChatID removes the filters associated with a chat/identity
func (f *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) {
// TODO: remove subscriptions from waku2 if required. Might need to be implemented in transport

toRemove := make([]*Filter, 0)
f.mutex.Lock()
filter, ok := f.filters[chatID]
for _, filter := range f.filters {
if filter.ChatID == chatID {
toRemove = append(toRemove, filter)
}
}
f.mutex.Unlock()

if !ok {
if len(toRemove) == 0 {
return nil, nil
}

err := f.Remove(context.Background(), filter)
err := f.Remove(context.Background(), toRemove...)
if err != nil {
return nil, err
}

return filter, nil
// TODO temporary so not changing the return type, otherwise we should return a slice
Copy link
Contributor

@osmaczko osmaczko Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and most of the changes here) will be reverted once community's pubsub topics are migrated, right?

Copy link
Contributor Author

@plopezlpz plopezlpz Sep 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the thing now is that we have to start "listening" in these pubsubs then when we start publishing in those these nodes will not miss messages

return toRemove[0], nil
}

// LoadPartitioned creates a filter for a partitioned topic.
Expand Down Expand Up @@ -513,12 +542,17 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter {
return f.filters[personalDiscoveryTopic]
}

// LoadPublic adds a filter for a public chat.
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) {
// LoadPublic adds a filter for a public chat with specific pubsubTopic
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, distinctByPubsub bool) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

if chat, ok := f.filters[chatID]; ok {
filterKey := chatID
if distinctByPubsub {
filterKey = concatFilterKey(chatID, pubsubTopic)
}

if chat, ok := f.filters[filterKey]; ok {
if chat.PubsubTopic != pubsubTopic {
f.logger.Debug("updating pubsub topic for filter",
zap.String("chatID", chatID),
Expand All @@ -527,9 +561,8 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
zap.String("newTopic", pubsubTopic),
)
chat.PubsubTopic = pubsubTopic
f.filters[chatID] = chat
f.filters[filterKey] = chat
}

return chat, nil
}

Expand All @@ -549,10 +582,12 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
OneToOne: false,
}

f.filters[chatID] = chat
f.filters[filterKey] = chat

f.logger.Debug("registering filter for",
zap.String("chatID", chatID),
zap.String("filterKey", filterKey),
zap.Bool("distinctByPubsub", distinctByPubsub),
zap.String("type", "public"),
zap.String("ContentTopic", filterAndTopic.Topic.String()),
zap.String("PubsubTopic", pubsubTopic),
Expand Down Expand Up @@ -680,3 +715,13 @@ func (f *FiltersManager) GetNegotiated(identity *ecdsa.PublicKey) *Filter {

return f.filters[NegotiatedTopic(identity)]
}

// toCommunityFilterKey creates a unique key for filters map using chatID and pubsubTopic
//
// to allow one chat to have multiple filters in different pubsubTopics so that we can migrate the communities to 128 and 256 shards
func concatFilterKey(chatID string, pubsubTopic string) string {
if pubsubTopic == "" {
return chatID
}
return fmt.Sprintf("%s::%s", chatID, pubsubTopic)
}
22 changes: 18 additions & 4 deletions messaging/layers/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret ethtypes.NegotiatedSecret) (*
}

func (t *Transport) JoinPublic(chatID string) (*Filter, error) {
return t.filters.LoadPublic(chatID, "")
return t.filters.LoadPublic(chatID, "", false)
}

func (t *Transport) JoinPrivate(publicKey *ecdsa.PublicKey) (*Filter, error) {
Expand Down Expand Up @@ -267,7 +267,7 @@ func (t *Transport) SendPublic(ctx context.Context, newMessage *wakutypes.NewMes
return nil, err
}

filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic)
filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *wakuty
}

// We load the filter to make sure we can post on it
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic)
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -508,7 +508,21 @@ func (t *Transport) ConnectionChanged(state connection.State) {

// Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected
func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
return t.waku.SubscribeToPubsubTopic(topic, optPublicKey)
var pubKey *ecdsa.PublicKey
if optPublicKey != nil {
pubKey = optPublicKey
} else {
// try to retrieve pubkey for pubsubtopic if none provided
privK, err := t.RetrievePubsubTopicKey(topic)
if err != nil {
return err
}
if privK != nil {
pubKey = &privK.PublicKey
}
}
t.logger.Debug("subscribing to protected pubsub topic", zap.String("pubsubtopic", topic), zap.Any("pubkey", pubKey))
return t.waku.SubscribeToPubsubTopic(topic, pubKey)
}

// Unsubscribe from a pubsub topic
Expand Down
2 changes: 2 additions & 0 deletions messaging/types/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
type ChatToInitialize struct {
ChatID string
PubsubTopic string
// TODO (#6384) temporary flag while migrating community shards
IsCommunity bool
}

type ChatsToInitialize []*ChatToInitialize
Expand Down
36 changes: 36 additions & 0 deletions messaging/types/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,49 @@ func DefaultShardPubsubTopic() string {
return wakuproto.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String()
}

func DefaultShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: DefaultShardIndex,
}
}

func DefaultNonProtectedShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: NonProtectedShardIndex,
}
}

// TODO this is used only for community control messages, we need to stop using it once migration is done
func DefaultNonProtectedPubsubTopic() string {
return DefaultNonProtectedShard().PubsubTopic()
}

// GlobalCommunityControlShard returns the shard for the global community control messages
//
// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L329
func GlobalCommunityControlShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: 128,
}
}

// GlobalCommunityContentShard returns the shard for the global community content messages
//
// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L330
func GlobalCommunityContentShard() *Shard {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why it's duplicated with following messaging/waku/shard.go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found some packages are accessing one and some others the other

return &Shard{
Cluster: MainStatusShardCluster,
Index: 256,
}
}

func GlobalCommunityControlPubsubTopic() string {
return GlobalCommunityControlShard().PubsubTopic()
}

func GlobalCommunityContentPubsubTopic() string {
return GlobalCommunityContentShard().PubsubTopic()
}
36 changes: 36 additions & 0 deletions messaging/waku/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,42 @@ func DefaultNonProtectedShard() *Shard {
}
}

// TODO this is used only for community control messages, we need to stop using it once migration is done
func DefaultNonProtectedPubsubTopic() string {
return DefaultNonProtectedShard().PubsubTopic()
}

func DefaultShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: DefaultShardIndex,
}
}

// GlobalCommunityControlShard returns the shard for the global community control messages
//
// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L329
func GlobalCommunityControlShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: 128,
}
}

// GlobalCommunityContentShard returns the shard for the global community content messages
//
// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L330
func GlobalCommunityContentShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: 256,
}
}

func GlobalCommunityControlPubsubTopic() string {
return GlobalCommunityControlShard().PubsubTopic()
}

func GlobalCommunityContentPubsubTopic() string {
return GlobalCommunityContentShard().PubsubTopic()
}
Loading
Loading