Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b203ef7
feat(sharding)_: add shards config
plopezlpz May 11, 2025
72d75aa
feat(sharding)_: listen to new community shards together with old ones
plopezlpz May 11, 2025
37ef179
feat(sharding)_: linter
plopezlpz May 12, 2025
9db6cb8
feat(sharding)_: add chat specific filters
plopezlpz May 12, 2025
6634a74
feat(sharding)_: missing filters
plopezlpz May 12, 2025
b4aec7b
feat(sharding)_: stage2 sending in new communities shard
plopezlpz May 19, 2025
5efe5cb
chore_: Merge branch 'develop' into feat/communities-in-own-shard-1
plopezlpz Jun 3, 2025
1db5780
fix_: fix filters
plopezlpz Jun 3, 2025
63782e2
chore_: Merge branch 'feat/communities-in-own-shard-1' into feature/c…
plopezlpz Jun 3, 2025
4f3eade
chore_: merge branch 'develop' into feat/communities-in-own-shard-1
plopezlpz Aug 19, 2025
787a7d4
chore_: merge conflicts
plopezlpz Aug 19, 2025
ba90867
chore_: use right type
plopezlpz Aug 19, 2025
a553d1a
fix: community store node request
plopezlpz Aug 20, 2025
023cabc
chore: merge develop into feat/communities-in-own-shard-1
plopezlpz Aug 20, 2025
85170ab
fix: lint
plopezlpz Aug 20, 2025
f927996
fix: lint
plopezlpz Aug 20, 2025
4744db8
chore: merge remote-tracking branch origin/develop into feat/communit…
plopezlpz Aug 20, 2025
d9e3ff7
chore: merge feat/communities-in-own-shard-1 into feature/communities…
plopezlpz Aug 21, 2025
5286e56
chore: merge
plopezlpz Aug 21, 2025
51eeab2
fix: merge conflicts
plopezlpz Aug 21, 2025
0aaa81a
fix: remove print
plopezlpz Aug 21, 2025
18dc2c6
fix: add shard to tests
plopezlpz Aug 21, 2025
d2bf53e
chore: merge branch develop into feat/communities-in-own-shard-1
plopezlpz Aug 22, 2025
d2bd5a1
chore: merge branch feat/communities-in-own-shard-1 into feature/comm…
plopezlpz Aug 22, 2025
1748e23
fix: overriding filters
plopezlpz Aug 22, 2025
9b8f350
chore: merge branch feat/communities-in-own-shard-1 into feature/comm…
plopezlpz Aug 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions messaging/adapters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ func ChatsToInitializeToTransport(c types.ChatsToInitialize) []transport.Filters
filters[i] = transport.FiltersToInitialize{
ChatID: chat.ChatID,
PubsubTopic: chat.PubsubTopic,
// TODO (#6384) temporary flag while migrating community shards
IsCommunity: chat.IsCommunity,
}
}
return filters
Expand Down
4 changes: 2 additions & 2 deletions messaging/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ func (a *API) ConnectionChanged(state connection.State) {
a.core.connectionChanged(state)
}

func (a *API) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
return a.core.transport.SubscribeToPubsubTopic(topic, optPublicKey)
func (a *API) SubscribeToPubsubTopic(topic string, optPublicKey ...*ecdsa.PublicKey) error {
return a.core.transport.SubscribeToPubsubTopic(topic, optPublicKey...)
}

func (a *API) UnsubscribeFromPubsubTopic(topic string) error {
Expand Down
89 changes: 67 additions & 22 deletions messaging/layers/transport/filters_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"strings"
"sync"

"github.com/pkg/errors"
Expand Down Expand Up @@ -127,13 +129,15 @@ func (f *FiltersManager) Init(
type FiltersToInitialize struct {
ChatID string
PubsubTopic string
// TODO (#6384) temporary flag while migrating community shards
IsCommunity bool
}

func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) {
var filters []*Filter
// Add public, one-to-one and negotiated filters.
for _, pf := range publicFiltersToInit {
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic)
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.IsCommunity)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -161,6 +165,8 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com

topics := make([]string, 0)
topics = append(topics, wakuv2.DefaultNonProtectedPubsubTopic())
topics = append(topics, wakuv2.GlobalCommunityControlPubsubTopic())
topics = append(topics, wakuv2.GlobalCommunityContentPubsubTopic())
topics = append(topics, communityFilter.Shard.PubsubTopic())

for _, pubsubTopic := range topics {
Expand Down Expand Up @@ -219,7 +225,16 @@ func (f *FiltersManager) Filters() (result []*Filter) {
func (f *FiltersManager) Filter(chatID string) *Filter {
f.mutex.Lock()
defer f.mutex.Unlock()
return f.filters[chatID]

// find the first filter that matches this chat ID
// TODO this is temporary so not changing the return type, otherwise we should return a slice
for key, filter := range f.filters {
if strings.HasPrefix(key, chatID) {
return filter
}
}

return nil
}

// FilterByFilterID returns a Filter with a given Whisper filter ID.
Expand Down Expand Up @@ -275,19 +290,23 @@ func (f *FiltersManager) FilterByChatID(chatID string) *Filter {
return f.filters[chatID]
}

// Remove remove all the filters associated with a chat/identity
func (f *FiltersManager) Remove(ctx context.Context, filters ...*Filter) error {
// Remove removes all the filtersToRemove
func (f *FiltersManager) Remove(ctx context.Context, filtersToRemove ...*Filter) error {
f.mutex.Lock()
defer f.mutex.Unlock()

for _, filter := range filters {
for _, filter := range filtersToRemove {
if err := f.service.Unsubscribe(ctx, filter.FilterID); err != nil {
return err
}
if filter.SymKeyID != "" {
f.service.DeleteSymKey(filter.SymKeyID)
}
delete(f.filters, filter.ChatID)
for k, v := range f.filters {
if filter.FilterID == v.FilterID {
delete(f.filters, k)
}
}
}

return nil
Expand All @@ -300,10 +319,10 @@ func (f *FiltersManager) RemoveNoListenFilters() error {
var filterIDs []string
var filters []*Filter

for _, f := range filters {
if !f.Listen {
filterIDs = append(filterIDs, f.FilterID)
filters = append(filters, f)
for _, v := range f.filters {
if !v.Listen {
filterIDs = append(filterIDs, v.FilterID)
filters = append(filters, v)
}
}
if err := f.service.UnsubscribeMany(filterIDs); err != nil {
Expand All @@ -314,30 +333,40 @@ func (f *FiltersManager) RemoveNoListenFilters() error {
if filter.SymKeyID != "" {
f.service.DeleteSymKey(filter.SymKeyID)
}
delete(f.filters, filter.ChatID)
for k, v := range f.filters {
if filter.FilterID == v.FilterID {
delete(f.filters, k)
}
}
}

return nil
}

// Remove remove all the filters associated with a chat/identity
// RemoveFilterByChatID removes the filters associated with a chat/identity
func (f *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) {
// TODO: remove subscriptions from waku2 if required. Might need to be implemented in transport

toRemove := make([]*Filter, 0)
f.mutex.Lock()
filter, ok := f.filters[chatID]
for _, filter := range f.filters {
if filter.ChatID == chatID {
toRemove = append(toRemove, filter)
}
}
f.mutex.Unlock()

if !ok {
if len(toRemove) == 0 {
return nil, nil
}

err := f.Remove(context.Background(), filter)
err := f.Remove(context.Background(), toRemove...)
if err != nil {
return nil, err
}

return filter, nil
// TODO temporary so not changing the return type, otherwise we should return a slice
return toRemove[0], nil
}

// LoadPartitioned creates a filter for a partitioned topic.
Expand Down Expand Up @@ -513,12 +542,17 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter {
return f.filters[personalDiscoveryTopic]
}

// LoadPublic adds a filter for a public chat.
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) {
// LoadPublic adds a filter for a public chat with specific pubsubTopic
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, isCommunity ...bool) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

if chat, ok := f.filters[chatID]; ok {
filterKey := chatID
if len(isCommunity) > 0 && isCommunity[0] {
filterKey = toCommunityFilterKey(chatID, pubsubTopic)
}

if chat, ok := f.filters[filterKey]; ok {
if chat.PubsubTopic != pubsubTopic {
f.logger.Debug("updating pubsub topic for filter",
zap.String("chatID", chatID),
Expand All @@ -527,9 +561,8 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
zap.String("newTopic", pubsubTopic),
)
chat.PubsubTopic = pubsubTopic
f.filters[chatID] = chat
f.filters[filterKey] = chat
}

return chat, nil
}

Expand All @@ -549,10 +582,12 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
OneToOne: false,
}

f.filters[chatID] = chat
f.filters[filterKey] = chat

f.logger.Debug("registering filter for",
zap.String("chatID", chatID),
zap.String("filterKey", filterKey),
zap.Bool("isCommunity", len(isCommunity) > 0 && isCommunity[0]),
zap.String("type", "public"),
zap.String("ContentTopic", filterAndTopic.Topic.String()),
zap.String("PubsubTopic", pubsubTopic),
Expand Down Expand Up @@ -680,3 +715,13 @@ func (f *FiltersManager) GetNegotiated(identity *ecdsa.PublicKey) *Filter {

return f.filters[NegotiatedTopic(identity)]
}

// toCommunityFilterKey creates a unique key for filters map using chatID and pubsubTopic
//
// to allow one chat to have multiple filters in different pubsubTopics so that we can migrate the communities to 128 and 256 shards
func toCommunityFilterKey(chatID string, pubsubTopic string) string {
if pubsubTopic == "" {
return chatID
}
return fmt.Sprintf("%s::%s", chatID, pubsubTopic)
}
24 changes: 21 additions & 3 deletions messaging/layers/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transport
import (
"context"
"crypto/ecdsa"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -348,7 +349,7 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *wakuty
}

// We load the filter to make sure we can post on it
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic)
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -507,8 +508,25 @@ func (t *Transport) ConnectionChanged(state connection.State) {
}

// Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected
func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
return t.waku.SubscribeToPubsubTopic(topic, optPublicKey)
func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKeys ...*ecdsa.PublicKey) error {
if len(optPublicKeys) > 1 {
return fmt.Errorf("at most one public key can be passed, pub keys: %v", len(optPublicKeys))
}
var pubKey *ecdsa.PublicKey
if len(optPublicKeys) == 1 {
pubKey = optPublicKeys[0]
} else {
// try to retrieve pubkey for pubsubtopic if none provided
privK, err := t.RetrievePubsubTopicKey(topic)
if err != nil {
return err
}
if privK != nil {
pubKey = &privK.PublicKey
}
}
t.logger.Debug("subscribing to protected pubsub topic", zap.String("pubsubtopic", topic), zap.Any("pubkey", pubKey))
return t.waku.SubscribeToPubsubTopic(topic, pubKey)
}

// Unsubscribe from a pubsub topic
Expand Down
2 changes: 2 additions & 0 deletions messaging/types/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
type ChatToInitialize struct {
ChatID string
PubsubTopic string
// TODO (#6384) temporary flag while migrating community shards
IsCommunity bool
}

type ChatsToInitialize []*ChatToInitialize
Expand Down
36 changes: 36 additions & 0 deletions messaging/types/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,49 @@ func DefaultShardPubsubTopic() string {
return wakuproto.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String()
}

func DefaultShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: DefaultShardIndex,
}
}

func DefaultNonProtectedShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: NonProtectedShardIndex,
}
}

// TODO this is used only for community control messages, we need to stop using it once migration is done
func DefaultNonProtectedPubsubTopic() string {
return DefaultNonProtectedShard().PubsubTopic()
}

// GlobalCommunityControlShard returns the shard for the global community control messages
//
// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L329
func GlobalCommunityControlShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: 128,
}
}

// GlobalCommunityContentShard returns the shard for the global community content messages
//
// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L330
func GlobalCommunityContentShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: 256,
}
}

func GlobalCommunityControlPubsubTopic() string {
return GlobalCommunityControlShard().PubsubTopic()
}

func GlobalCommunityContentPubsubTopic() string {
return GlobalCommunityContentShard().PubsubTopic()
}
36 changes: 36 additions & 0 deletions messaging/waku/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,42 @@ func DefaultNonProtectedShard() *Shard {
}
}

// TODO this is used only for community control messages, we need to stop using it once migration is done
func DefaultNonProtectedPubsubTopic() string {
return DefaultNonProtectedShard().PubsubTopic()
}

func DefaultShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: DefaultShardIndex,
}
}

// GlobalCommunityControlShard returns the shard for the global community control messages
//
// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L329
func GlobalCommunityControlShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: 128,
}
}

// GlobalCommunityContentShard returns the shard for the global community content messages
//
// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L330
func GlobalCommunityContentShard() *Shard {
return &Shard{
Cluster: MainStatusShardCluster,
Index: 256,
}
}

func GlobalCommunityControlPubsubTopic() string {
return GlobalCommunityControlShard().PubsubTopic()
}

func GlobalCommunityContentPubsubTopic() string {
return GlobalCommunityContentShard().PubsubTopic()
}
10 changes: 8 additions & 2 deletions protocol/communities/community.go
Original file line number Diff line number Diff line change
Expand Up @@ -1552,8 +1552,14 @@ func (o *Community) MemberUpdateChannelID() string {
return o.IDString() + "-memberUpdate"
}

func (o *Community) PubsubTopic() string {
return o.Shard().PubsubTopic()
func (o *Community) PubsubTopic(fallbackPubsubTopic ...string) string {
if o.Shard().PubsubTopic() != "" {
return o.Shard().PubsubTopic()
}
if len(fallbackPubsubTopic) > 0 {
return fallbackPubsubTopic[0]
}
return ""
}

func (o *Community) PubsubTopicPrivateKey() *ecdsa.PrivateKey {
Expand Down
2 changes: 1 addition & 1 deletion protocol/communities_key_distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (ckd *CommunitiesKeyDistributorImpl) sendKeyExchangeMessage(community *comm
Recipients: pubkeys,
MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
HashRatchetGroupID: hashRatchetGroupID,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()),
}
_, err := ckd.messaging.SendCommunityMessage(context.Background(), &rawMessage)

Expand Down
Loading