diff --git a/messaging/adapters/filters.go b/messaging/adapters/filters.go index 6a43b94c083..05e25453edc 100644 --- a/messaging/adapters/filters.go +++ b/messaging/adapters/filters.go @@ -11,6 +11,8 @@ func ChatsToInitializeToTransport(c types.ChatsToInitialize) []transport.Filters filters[i] = transport.FiltersToInitialize{ ChatID: chat.ChatID, PubsubTopic: chat.PubsubTopic, + // TODO (#6384) temporary flag while migrating community shards + DistinctByPubsub: chat.IsCommunity, } } return filters diff --git a/messaging/layers/transport/filters_manager.go b/messaging/layers/transport/filters_manager.go index 83687a5b84e..44fe617e6bc 100644 --- a/messaging/layers/transport/filters_manager.go +++ b/messaging/layers/transport/filters_manager.go @@ -5,6 +5,8 @@ import ( "context" "crypto/ecdsa" "encoding/hex" + "fmt" + "strings" "sync" "github.com/pkg/errors" @@ -101,7 +103,7 @@ func (f *FiltersManager) Init( // Add public, one-to-one and negotiated filters. for _, fi := range filtersToInit { - _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic) + _, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, false) if err != nil { return nil, err } @@ -127,13 +129,15 @@ func (f *FiltersManager) Init( type FiltersToInitialize struct { ChatID string PubsubTopic string + // TODO (#6384) temporary flag while migrating community shards + DistinctByPubsub bool } func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) { var filters []*Filter // Add public, one-to-one and negotiated filters. for _, pf := range publicFiltersToInit { - f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic) + f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.DistinctByPubsub) if err != nil { return nil, err } @@ -161,6 +165,8 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com topics := make([]string, 0) topics = append(topics, wakuv2.DefaultNonProtectedPubsubTopic()) + topics = append(topics, wakuv2.GlobalCommunityControlPubsubTopic()) + topics = append(topics, wakuv2.GlobalCommunityContentPubsubTopic()) topics = append(topics, communityFilter.Shard.PubsubTopic()) for _, pubsubTopic := range topics { @@ -219,7 +225,16 @@ func (f *FiltersManager) Filters() (result []*Filter) { func (f *FiltersManager) Filter(chatID string) *Filter { f.mutex.Lock() defer f.mutex.Unlock() - return f.filters[chatID] + + // find the first filter that matches this chat ID + // TODO this is temporary so not changing the return type, otherwise we should return a slice + for key, filter := range f.filters { + if strings.HasPrefix(key, chatID) { + return filter + } + } + + return nil } // FilterByFilterID returns a Filter with a given Whisper filter ID. @@ -275,19 +290,23 @@ func (f *FiltersManager) FilterByChatID(chatID string) *Filter { return f.filters[chatID] } -// Remove remove all the filters associated with a chat/identity -func (f *FiltersManager) Remove(ctx context.Context, filters ...*Filter) error { +// Remove removes all the filtersToRemove +func (f *FiltersManager) Remove(ctx context.Context, filtersToRemove ...*Filter) error { f.mutex.Lock() defer f.mutex.Unlock() - for _, filter := range filters { + for _, filter := range filtersToRemove { if err := f.service.Unsubscribe(ctx, filter.FilterID); err != nil { return err } if filter.SymKeyID != "" { f.service.DeleteSymKey(filter.SymKeyID) } - delete(f.filters, filter.ChatID) + for k, v := range f.filters { + if filter.FilterID == v.FilterID { + delete(f.filters, k) + } + } } return nil @@ -300,10 +319,10 @@ func (f *FiltersManager) RemoveNoListenFilters() error { var filterIDs []string var filters []*Filter - for _, f := range filters { - if !f.Listen { - filterIDs = append(filterIDs, f.FilterID) - filters = append(filters, f) + for _, v := range f.filters { + if !v.Listen { + filterIDs = append(filterIDs, v.FilterID) + filters = append(filters, v) } } if err := f.service.UnsubscribeMany(filterIDs); err != nil { @@ -314,30 +333,40 @@ func (f *FiltersManager) RemoveNoListenFilters() error { if filter.SymKeyID != "" { f.service.DeleteSymKey(filter.SymKeyID) } - delete(f.filters, filter.ChatID) + for k, v := range f.filters { + if filter.FilterID == v.FilterID { + delete(f.filters, k) + } + } } return nil } -// Remove remove all the filters associated with a chat/identity +// RemoveFilterByChatID removes the filters associated with a chat/identity func (f *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) { // TODO: remove subscriptions from waku2 if required. Might need to be implemented in transport + toRemove := make([]*Filter, 0) f.mutex.Lock() - filter, ok := f.filters[chatID] + for _, filter := range f.filters { + if filter.ChatID == chatID { + toRemove = append(toRemove, filter) + } + } f.mutex.Unlock() - if !ok { + if len(toRemove) == 0 { return nil, nil } - err := f.Remove(context.Background(), filter) + err := f.Remove(context.Background(), toRemove...) if err != nil { return nil, err } - return filter, nil + // TODO temporary so not changing the return type, otherwise we should return a slice + return toRemove[0], nil } // LoadPartitioned creates a filter for a partitioned topic. @@ -513,12 +542,17 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter { return f.filters[personalDiscoveryTopic] } -// LoadPublic adds a filter for a public chat. -func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) { +// LoadPublic adds a filter for a public chat with specific pubsubTopic +func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, distinctByPubsub bool) (*Filter, error) { f.mutex.Lock() defer f.mutex.Unlock() - if chat, ok := f.filters[chatID]; ok { + filterKey := chatID + if distinctByPubsub { + filterKey = concatFilterKey(chatID, pubsubTopic) + } + + if chat, ok := f.filters[filterKey]; ok { if chat.PubsubTopic != pubsubTopic { f.logger.Debug("updating pubsub topic for filter", zap.String("chatID", chatID), @@ -527,9 +561,8 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, zap.String("newTopic", pubsubTopic), ) chat.PubsubTopic = pubsubTopic - f.filters[chatID] = chat + f.filters[filterKey] = chat } - return chat, nil } @@ -549,10 +582,12 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, OneToOne: false, } - f.filters[chatID] = chat + f.filters[filterKey] = chat f.logger.Debug("registering filter for", zap.String("chatID", chatID), + zap.String("filterKey", filterKey), + zap.Bool("distinctByPubsub", distinctByPubsub), zap.String("type", "public"), zap.String("ContentTopic", filterAndTopic.Topic.String()), zap.String("PubsubTopic", pubsubTopic), @@ -680,3 +715,13 @@ func (f *FiltersManager) GetNegotiated(identity *ecdsa.PublicKey) *Filter { return f.filters[NegotiatedTopic(identity)] } + +// toCommunityFilterKey creates a unique key for filters map using chatID and pubsubTopic +// +// to allow one chat to have multiple filters in different pubsubTopics so that we can migrate the communities to 128 and 256 shards +func concatFilterKey(chatID string, pubsubTopic string) string { + if pubsubTopic == "" { + return chatID + } + return fmt.Sprintf("%s::%s", chatID, pubsubTopic) +} diff --git a/messaging/layers/transport/transport.go b/messaging/layers/transport/transport.go index e6d78f3a4ce..c986b6cb89f 100644 --- a/messaging/layers/transport/transport.go +++ b/messaging/layers/transport/transport.go @@ -185,7 +185,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret ethtypes.NegotiatedSecret) (* } func (t *Transport) JoinPublic(chatID string) (*Filter, error) { - return t.filters.LoadPublic(chatID, "") + return t.filters.LoadPublic(chatID, "", false) } func (t *Transport) JoinPrivate(publicKey *ecdsa.PublicKey) (*Filter, error) { @@ -267,7 +267,7 @@ func (t *Transport) SendPublic(ctx context.Context, newMessage *wakutypes.NewMes return nil, err } - filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic) + filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, false) if err != nil { return nil, err } @@ -348,7 +348,7 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *wakuty } // We load the filter to make sure we can post on it - filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic) + filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, true) if err != nil { return nil, err } @@ -508,7 +508,21 @@ func (t *Transport) ConnectionChanged(state connection.State) { // Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error { - return t.waku.SubscribeToPubsubTopic(topic, optPublicKey) + var pubKey *ecdsa.PublicKey + if optPublicKey != nil { + pubKey = optPublicKey + } else { + // try to retrieve pubkey for pubsubtopic if none provided + privK, err := t.RetrievePubsubTopicKey(topic) + if err != nil { + return err + } + if privK != nil { + pubKey = &privK.PublicKey + } + } + t.logger.Debug("subscribing to protected pubsub topic", zap.String("pubsubtopic", topic), zap.Any("pubkey", pubKey)) + return t.waku.SubscribeToPubsubTopic(topic, pubKey) } // Unsubscribe from a pubsub topic diff --git a/messaging/types/filters.go b/messaging/types/filters.go index 2cc53ef5a2e..2d153c76d50 100644 --- a/messaging/types/filters.go +++ b/messaging/types/filters.go @@ -7,6 +7,8 @@ import ( type ChatToInitialize struct { ChatID string PubsubTopic string + // TODO (#6384) temporary flag while migrating community shards + IsCommunity bool } type ChatsToInitialize []*ChatToInitialize diff --git a/messaging/types/shard.go b/messaging/types/shard.go index 577ee3ab029..8c6700fc932 100644 --- a/messaging/types/shard.go +++ b/messaging/types/shard.go @@ -55,6 +55,13 @@ func DefaultShardPubsubTopic() string { return wakuproto.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String() } +func DefaultShard() *Shard { + return &Shard{ + Cluster: MainStatusShardCluster, + Index: DefaultShardIndex, + } +} + func DefaultNonProtectedShard() *Shard { return &Shard{ Cluster: MainStatusShardCluster, @@ -62,6 +69,35 @@ func DefaultNonProtectedShard() *Shard { } } +// TODO this is used only for community control messages, we need to stop using it once migration is done func DefaultNonProtectedPubsubTopic() string { return DefaultNonProtectedShard().PubsubTopic() } + +// GlobalCommunityControlShard returns the shard for the global community control messages +// +// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L329 +func GlobalCommunityControlShard() *Shard { + return &Shard{ + Cluster: MainStatusShardCluster, + Index: 128, + } +} + +// GlobalCommunityContentShard returns the shard for the global community content messages +// +// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L330 +func GlobalCommunityContentShard() *Shard { + return &Shard{ + Cluster: MainStatusShardCluster, + Index: 256, + } +} + +func GlobalCommunityControlPubsubTopic() string { + return GlobalCommunityControlShard().PubsubTopic() +} + +func GlobalCommunityContentPubsubTopic() string { + return GlobalCommunityContentShard().PubsubTopic() +} diff --git a/messaging/waku/shard.go b/messaging/waku/shard.go index 1067bdea609..9016e4858c9 100644 --- a/messaging/waku/shard.go +++ b/messaging/waku/shard.go @@ -43,6 +43,42 @@ func DefaultNonProtectedShard() *Shard { } } +// TODO this is used only for community control messages, we need to stop using it once migration is done func DefaultNonProtectedPubsubTopic() string { return DefaultNonProtectedShard().PubsubTopic() } + +func DefaultShard() *Shard { + return &Shard{ + Cluster: MainStatusShardCluster, + Index: DefaultShardIndex, + } +} + +// GlobalCommunityControlShard returns the shard for the global community control messages +// +// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L329 +func GlobalCommunityControlShard() *Shard { + return &Shard{ + Cluster: MainStatusShardCluster, + Index: 128, + } +} + +// GlobalCommunityContentShard returns the shard for the global community content messages +// +// Specs: https://github.com/vacp2p/rfc-index/blob/8ee2a6d6b232838d83374c35e2413f84436ecf64/status/56/communities.md?plain=1#L330 +func GlobalCommunityContentShard() *Shard { + return &Shard{ + Cluster: MainStatusShardCluster, + Index: 256, + } +} + +func GlobalCommunityControlPubsubTopic() string { + return GlobalCommunityControlShard().PubsubTopic() +} + +func GlobalCommunityContentPubsubTopic() string { + return GlobalCommunityContentShard().PubsubTopic() +} diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 693258ec50f..2f90a41b0bf 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -939,10 +939,10 @@ func (m *Messenger) SpectatedCommunities() ([]*communities.Community, error) { func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Chat, error) { logger := m.logger.Named("initCommunityChats") - publicChatsToInit := m.DefaultFilters(community) chats := CreateCommunityChats(community, m.getTimesource()) + publicChatsToInit := m.DefaultFilters(community) filters, err := m.messaging.InitPublicChats(publicChatsToInit) if err != nil { logger.Debug("InitPublicChats error", zap.Error(err)) @@ -1032,20 +1032,7 @@ func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *messagi if !m.started { return nil } - // TODO: this should probably be moved completely to transport once pubsub topic logic is implemented - pubsubTopic := shard.PubsubTopic() - - privK, err := m.messaging.RetrievePubsubTopicKey(pubsubTopic) - if err != nil { - return err - } - - var pubK *ecdsa.PublicKey - if privK != nil { - pubK = &privK.PublicKey - } - - return m.messaging.SubscribeToPubsubTopic(pubsubTopic, pubK) + return m.messaging.SubscribeToPubsubTopic(shard.PubsubTopic(), nil) } func (m *Messenger) unsubscribeFromShard(shard *messagingtypes.Shard) error { @@ -2453,11 +2440,15 @@ func (m *Messenger) DefaultFilters(o *communities.Community) messagingtypes.Chat communityPubsubTopic := o.PubsubTopic() chats := messagingtypes.ChatsToInitialize{ - {ChatID: cID, PubsubTopic: communityPubsubTopic}, - {ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic}, - {ChatID: uncompressedPubKey, PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic()}, + {ChatID: cID, PubsubTopic: communityPubsubTopic, IsCommunity: true}, + {ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic, IsCommunity: true}, + {ChatID: uncompressedPubKey, PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(), IsCommunity: true}, + {ChatID: uncompressedPubKey, PubsubTopic: messagingtypes.GlobalCommunityContentPubsubTopic(), IsCommunity: true}, + } + if communityPubsubTopic == "" { + chats = append(chats, &messagingtypes.ChatToInitialize{ChatID: cID, PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(), IsCommunity: true}) + chats = append(chats, &messagingtypes.ChatToInitialize{ChatID: memberUpdateChannelID, PubsubTopic: messagingtypes.GlobalCommunityContentPubsubTopic(), IsCommunity: true}) } - return chats } diff --git a/protocol/messenger_filter_init.go b/protocol/messenger_filter_init.go index f380caa5e94..a32624044ac 100644 --- a/protocol/messenger_filter_init.go +++ b/protocol/messenger_filter_init.go @@ -23,9 +23,18 @@ func (m *Messenger) InitFilters() error { rand.Seed(time.Now().Unix()) // Community requests will arrive in this pubsub topic + // TODO remove once fully migrated to Global Community Control and Content Topic https://github.com/status-im/status-go/issues/6384 if err := m.SubscribeToPubsubTopic(messagingtypes.DefaultNonProtectedPubsubTopic(), nil); err != nil { return err } + // TODO only subscribe if interested in communities + if err := m.SubscribeToPubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic(), nil); err != nil { + return err + } + // TODO only subscribe if interested in communities + if err := m.SubscribeToPubsubTopic(messagingtypes.GlobalCommunityContentPubsubTopic(), nil); err != nil { + return err + } filters, publicKeys, err := m.collectFiltersAndKeys() if err != nil { diff --git a/protocol/messenger_store_node_request_manager.go b/protocol/messenger_store_node_request_manager.go index 2f29676d58d..f2b71dba3cc 100644 --- a/protocol/messenger_store_node_request_manager.go +++ b/protocol/messenger_store_node_request_manager.go @@ -84,46 +84,76 @@ func (m *StoreNodeRequestManager) FetchCommunity(ctx context.Context, community zap.Any("community", community), zap.Any("config", cfg)) - requestCommunity := func(communityID string, shard *messagingtypes.Shard) (*communities.Community, StoreNodeRequestStats, error) { - channel, err := m.subscribeToRequest(ctx, storeNodeCommunityRequest, communityID, shard, cfg) + fetch := func() (*communities.Community, StoreNodeRequestStats, error) { + if community.Shard != nil { + return m.fetchCommunity(ctx, community.CommunityID, cfg, community.Shard) + } + communityCustomShard, err := m.fetchCustomShard(ctx, community.CommunityID, cfg, messagingtypes.DefaultNonProtectedShard(), messagingtypes.GlobalCommunityControlShard()) if err != nil { - return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a request for community: %w", err) + return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to fetch community shard: %w", err) } - - if !cfg.WaitForResponse { - return nil, StoreNodeRequestStats{}, nil + if communityCustomShard != nil { + return m.fetchCommunity(ctx, community.CommunityID, cfg, communityCustomShard) } + // request community in 32 and 128 shards + return m.fetchCommunity(ctx, community.CommunityID, cfg, messagingtypes.DefaultShard(), messagingtypes.GlobalCommunityControlShard()) + } - result := <-channel - return result.community, result.stats, result.err + if !cfg.WaitForResponse { + go func() { + defer gocommon.LogOnPanic() + _, _, err := fetch() + if err != nil { + m.logger.Error("failed to fetch community", zap.Error(err)) + } + }() + return nil, StoreNodeRequestStats{}, nil } + return fetch() +} - // if shard was not passed or nil, request shard first - communityShard := community.Shard - if communityShard == nil { - id := messaging.CommunityShardInfoTopic(community.CommunityID) - fetchedShard, err := m.subscribeToRequest(ctx, storeNodeShardRequest, id, messagingtypes.DefaultNonProtectedShard(), cfg) +// fetchCommunity fetches a community from the store node. +// +// Since now we could have this community description in shards 32 and 128 we are doing the following: +// 1. trying to fetch in shards 32 and 128 +// 2. returning the first non-nil community result +// +// Eventually we should just go to shard 128, once full migration to Global Community Control and Content Topic is done. +func (m *StoreNodeRequestManager) fetchCommunity(ctx context.Context, communityID string, cfg StoreNodeRequestConfig, shards ...*messagingtypes.Shard) (*communities.Community, StoreNodeRequestStats, error) { + for _, shard := range shards { + sub, err := m.subscribeToRequest(ctx, storeNodeCommunityRequest, communityID, shard, cfg) if err != nil { return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a shard info request: %w", err) } - - if !cfg.WaitForResponse { - go func() { - defer gocommon.LogOnPanic() - shardResult := <-fetchedShard - communityShard = shardResult.shard - - _, _, _ = requestCommunity(community.CommunityID, communityShard) - }() - return nil, StoreNodeRequestStats{}, nil + // not doing this concurrently to avoid two filters for the same communityID, not a big deal since it is at most 2 shards and temporary + res := <-sub + if res.community != nil { + return res.community, res.stats, res.err } - - shardResult := <-fetchedShard - communityShard = shardResult.shard } + return nil, StoreNodeRequestStats{}, nil +} - // request community with on shard - return requestCommunity(community.CommunityID, communityShard) +// fetchCustomShard fetches a community custom shard from the store node. +// +// Since now we could have communities' shard info in shards 64 and 128 we are doing the following: +// 1. trying to fetch in shards 64 and 128 +// 2. returning the first non-nil shard result +// +// Eventually we should just go to shard 128, once full migration to Global Community Control and Content Topic is done. +func (m *StoreNodeRequestManager) fetchCustomShard(ctx context.Context, communityID string, cfg StoreNodeRequestConfig, shards ...*messagingtypes.Shard) (*messagingtypes.Shard, error) { + for _, shard := range shards { + sub, err := m.subscribeToRequest(ctx, storeNodeShardRequest, messaging.CommunityShardInfoTopic(communityID), shard, cfg) + if err != nil { + return nil, fmt.Errorf("failed to create a shard info request: %w", err) + } + // not doing this concurrently to avoid two filters for the same CommunityShardInfoTopic, not a big deal since it is at most 2 shards and temporary + res := <-sub + if res.shard != nil { + return res.shard, nil + } + } + return nil, nil } // FetchContact - similar to FetchCommunity