diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/HieroContext.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/HieroContext.java index 98e72228..4493b932 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/HieroContext.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/HieroContext.java @@ -4,6 +4,8 @@ import com.openelements.hiero.base.data.Account; import org.jspecify.annotations.NonNull; +import java.util.Set; + /** * Context for a specific Hiero connection to a network. */ @@ -25,4 +27,7 @@ public interface HieroContext { */ @NonNull Client getClient(); + + @NonNull + Set getMirrorNodeEndPoint(); } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java index bdd9c5d5..68284e0d 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java @@ -1,10 +1,14 @@ package com.openelements.hiero.base; import com.hedera.hashgraph.sdk.PrivateKey; +import com.hedera.hashgraph.sdk.SubscriptionHandle; import com.hedera.hashgraph.sdk.TopicId; +import com.hedera.hashgraph.sdk.TopicMessage; import org.jspecify.annotations.NonNull; +import java.time.Instant; import java.util.Objects; +import java.util.function.Consumer; /** * Interface for interacting with a Hiero network. This interface provides methods for interacting with Hiero Topic, @@ -317,4 +321,106 @@ default void submitMessage(@NonNull String topicId, @NonNull String submitKey, @ Objects.requireNonNull(message, "message cannot be null"); submitMessage(TopicId.fromString(topicId), PrivateKey.fromString(submitKey), message); }; + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription) + throws HieroException; + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer subscription) + throws HieroException { + return subscribeTopic(TopicId.fromString(topicId), subscription); + } + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param limit the number of message to return + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + long limit) throws HieroException; + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param limit the number of message to return + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer subscription, + long limit) throws HieroException { + return subscribeTopic(TopicId.fromString(topicId), subscription, limit); + } + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param startTime time to start subscribing to a topic + * @param endTime time to stop subscribing to a topic + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime) throws HieroException; + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param startTime time to start subscribing to a topic + * @param endTime time to stop subscribing to a topic + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime) throws HieroException { + return subscribeTopic(TopicId.fromString(topicId), subscription, startTime, endTime); + } + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param startTime time to start subscribing to a topic + * @param endTime time to stop subscribing to a topic + * @param limit the number of message to return + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime, long limit) + throws HieroException; + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param startTime time to start subscribing to a topic + * @param endTime time to stop subscribing to a topic + * @param limit the number of message to return + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime, long limit) + throws HieroException { + return subscribeTopic(TopicId.fromString(topicId), subscription, startTime, endTime, limit); + } } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/HieroConfig.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/HieroConfig.java index 02dc41d5..d88b6f05 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/HieroConfig.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/HieroConfig.java @@ -41,6 +41,9 @@ public interface HieroConfig { @NonNull Set getMirrorNodeAddresses(); + @NonNull + Set getConsensusServiceAddress(); + /** * Returns the consensus nodes. * @@ -88,6 +91,11 @@ default HieroContext createHieroContext() { public @NonNull Client getClient() { return client; } + + @Override + public @NonNull Set getMirrorNodeEndPoint() { + return getMirrorNodeAddresses(); + } }; } @@ -102,7 +110,7 @@ default Client createClient() { final Map nodes = getConsensusNodes().stream() .collect(Collectors.toMap(n -> n.getAddress(), n -> n.getAccountId())); final Client client = Client.forNetwork(nodes); - final List mirrorNodeAddresses = getMirrorNodeAddresses().stream().collect(Collectors.toList()); + final List mirrorNodeAddresses = getConsensusServiceAddress().stream().collect(Collectors.toList()); client.setMirrorNetwork(mirrorNodeAddresses); client.setOperator(getOperatorAccount().accountId(), getOperatorAccount().privateKey()); getRequestTimeout().ifPresent(client::setRequestTimeout); diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/NetworkSettings.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/NetworkSettings.java index b8516097..3b7b7442 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/NetworkSettings.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/NetworkSettings.java @@ -40,6 +40,14 @@ public interface NetworkSettings { @NonNull Set getMirrorNodeAddresses(); + /** + * Returns the consensus service address. + * + * @return the consensus service addresses + */ + @NonNull + Set getConsensusServiceAddress(); + /** * Returns the consensus nodes. * diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaMainnetSettings.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaMainnetSettings.java index 4c0f8a4e..051c36e0 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaMainnetSettings.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaMainnetSettings.java @@ -31,6 +31,9 @@ public final class HederaMainnetSettings implements NetworkSettings { return Set.of("https://mainnet.mirrornode.hedera.com:443"); } + @Override + public @NonNull Set getConsensusServiceAddress() {return Set.of("mainnet.mirrornode.hedera.com:443");} + @Override public @NonNull Set getConsensusNodes() { return Set.of(new ConsensusNode("35.186.191.247", "50211", "0.0.4")); diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaTestnetSettings.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaTestnetSettings.java index 5b4bd7c7..9639ce7e 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaTestnetSettings.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaTestnetSettings.java @@ -31,6 +31,9 @@ public final class HederaTestnetSettings implements NetworkSettings { return Set.of("https://testnet.mirrornode.hedera.com:443"); } + @Override + public @NonNull Set getConsensusServiceAddress() {return Set.of("testnet.mirrornode.hedera.com:443");} + @Override public @NonNull Set getConsensusNodes() { return Set.of(new ConsensusNode("0.testnet.hedera.com", "50211", "0.0.3")); diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/EnvBasedHieroConfig.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/EnvBasedHieroConfig.java index 081f74fe..27f0a2b9 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/EnvBasedHieroConfig.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/EnvBasedHieroConfig.java @@ -17,6 +17,7 @@ public class EnvBasedHieroConfig implements HieroConfig { private final PrivateKey operatorPrivateKey; private final String mirrorNodeAddress; + private final String consensusServiceAddress; private final String consensusNodeIp; private final String consensusNodePort; private final String consensusNodeAccount; @@ -37,6 +38,8 @@ public EnvBasedHieroConfig() { .orElseThrow(() -> new IllegalStateException("HEDERA_OPERATOR_PRIVATE_KEY is not set")); mirrorNodeAddress = getEnv("HEDERA_MIRROR_NODE_ADDRESS") .orElse(null); + consensusServiceAddress = getEnv("HEDERA_CONSENSUS_SERVICE_ADDRESS") + .orElseThrow(() -> new IllegalStateException("HEDERA_CONSENSUS_SERVICE_ADDRESS is not set")); consensusNodeIp = getEnv("HEDERA_CONSENSUS_NODE_IP") .orElseThrow(() -> new IllegalStateException("HEDERA_CONSENSUS_NODE_IP is not set")); consensusNodePort = getEnv("HEDERA_CONSENSUS_NODE_PORT") @@ -71,6 +74,11 @@ private Optional getEnv(String key) { return Set.of(mirrorNodeAddress); } + @Override + public @NonNull Set getConsensusServiceAddress() { + return Set.of(consensusServiceAddress); + } + @Override public @NonNull Set getConsensusNodes() { ConsensusNode node = new ConsensusNode( diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/NetworkSettingsBasedHieroConfig.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/NetworkSettingsBasedHieroConfig.java index 943d5965..caa57050 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/NetworkSettingsBasedHieroConfig.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/NetworkSettingsBasedHieroConfig.java @@ -37,6 +37,11 @@ public NetworkSettingsBasedHieroConfig(@NonNull final Account operatorAccount, @ return networkSetting.getMirrorNodeAddresses(); } + @Override + public @NonNull Set getConsensusServiceAddress() { + return networkSetting.getConsensusServiceAddress(); + } + @Override public @NonNull Set getConsensusNodes() { return networkSetting.getConsensusNodes(); diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/ProtocolLayerClientImpl.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/ProtocolLayerClientImpl.java index 343341d1..8af777b2 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/ProtocolLayerClientImpl.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/ProtocolLayerClientImpl.java @@ -434,7 +434,7 @@ public TopicMessageResult executeTopicMessageQuery(TopicMessageRequest request) query.setLimit(request.limit()); } final SubscriptionHandle subscribe = query.subscribe(hieroContext.getClient(), request.subscription()); - return new TopicMessageResult(); + return new TopicMessageResult(subscribe); } catch (final Exception e) { throw new HieroException("Failed to execute query message transaction", e); } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java index bc060faa..de1172a6 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java @@ -1,15 +1,25 @@ package com.openelements.hiero.base.implementation; import com.hedera.hashgraph.sdk.PrivateKey; +import com.hedera.hashgraph.sdk.SubscriptionHandle; import com.hedera.hashgraph.sdk.TopicId; +import com.hedera.hashgraph.sdk.TopicMessage; import com.openelements.hiero.base.HieroException; import com.openelements.hiero.base.TopicClient; import com.openelements.hiero.base.data.Account; import com.openelements.hiero.base.protocol.ProtocolLayerClient; -import com.openelements.hiero.base.protocol.data.*; +import com.openelements.hiero.base.protocol.data.TopicCreateRequest; +import com.openelements.hiero.base.protocol.data.TopicCreateResult; +import com.openelements.hiero.base.protocol.data.TopicUpdateRequest; +import com.openelements.hiero.base.protocol.data.TopicDeleteRequest; +import com.openelements.hiero.base.protocol.data.TopicSubmitMessageRequest; +import com.openelements.hiero.base.protocol.data.TopicMessageRequest; +import com.openelements.hiero.base.protocol.data.TopicMessageResult; import org.jspecify.annotations.NonNull; +import java.time.Instant; import java.util.Objects; +import java.util.function.Consumer; public class TopicClientImpl implements TopicClient { private final ProtocolLayerClient client; @@ -107,7 +117,7 @@ public void updateTopic(@NonNull TopicId topicId, @NonNull PrivateKey updatedAdm Objects.requireNonNull(topicId, "topicId must not be null"); Objects.requireNonNull(submitKey, "submitKey must not be null"); Objects.requireNonNull(memo, "memo must not be null"); - updateTopic(topicId, operationalAccount.privateKey(), updatedAdminKey, submitKey, memo); + updateTopic(topicId, operationalAccount.privateKey(), updatedAdminKey, submitKey, memo); } @Override @@ -202,4 +212,72 @@ public void submitMessage(@NonNull TopicId topicId, @NonNull PrivateKey submitKe TopicSubmitMessageRequest request = TopicSubmitMessageRequest.of(topicId, submitKey, message); client.executeTopicMessageSubmitTransaction(request); } + + @Override + public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription) + throws HieroException { + Objects.requireNonNull(topicId, "topicId must not be null"); + Objects.requireNonNull(subscription, "subscription must not be null"); + TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription); + TopicMessageResult result = client.executeTopicMessageQuery(request); + return result.subscriptionHandle(); + } + + @Override + public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + long limit) throws HieroException { + Objects.requireNonNull(topicId, "topicId must not be null"); + Objects.requireNonNull(subscription, "subscription must not be null"); + if (limit == 0) { + throw new IllegalArgumentException("limit must be greater than 0"); + } + + TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription, limit); + TopicMessageResult result = client.executeTopicMessageQuery(request); + return result.subscriptionHandle(); + } + + @Override + public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + Instant startTime, Instant endTime) throws HieroException { + Objects.requireNonNull(topicId, "topicId must not be null"); + Objects.requireNonNull(subscription, "subscription must not be null"); + Objects.requireNonNull(startTime, "startTime must not be null"); + Objects.requireNonNull(endTime, "endTime must not be null"); + + if (startTime.isBefore(Instant.now())) { + throw new IllegalArgumentException("startTime must be greater than currentTime"); + } + if (endTime.isBefore(startTime)) { + throw new IllegalArgumentException("endTime must be greater than startTime"); + } + + TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription, startTime, endTime); + TopicMessageResult result = client.executeTopicMessageQuery(request); + return result.subscriptionHandle(); + } + + @Override + public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime, long limit) + throws HieroException { + Objects.requireNonNull(topicId, "topicId must not be null"); + Objects.requireNonNull(subscription, "subscription must not be null"); + Objects.requireNonNull(startTime, "startTime must not be null"); + Objects.requireNonNull(endTime, "endTime must not be null"); + + if (startTime.isBefore(Instant.now())) { + throw new IllegalArgumentException("startTime must be greater than currentTime"); + } + if (endTime.isBefore(startTime)) { + throw new IllegalArgumentException("endTime must be greater than startTime"); + } + if (limit == 0) { + throw new IllegalArgumentException("limit must be greater than 0"); + } + + TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription, startTime, endTime, limit); + TopicMessageResult result = client.executeTopicMessageQuery(request); + return result.subscriptionHandle(); + } } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java index dca2c425..e75d41d1 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java @@ -26,4 +26,22 @@ public record TopicMessageRequest(@NonNull TopicId topicId, @NonNull Consumer subscription) { return new TopicMessageRequest(topicId, subscription, null, null, NO_LIMIT, null, null); } + + @NonNull + public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull long limit) { + return new TopicMessageRequest(topicId, subscription, null, null, limit, null, null); + } + + @NonNull + public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime) { + return new TopicMessageRequest(topicId, subscription, startTime, endTime, NO_LIMIT, null, null); + } + + @NonNull + public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime, long limit) { + return new TopicMessageRequest(topicId, subscription, startTime, endTime, limit, null, null); + } } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageResult.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageResult.java index 29781979..ecc31d24 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageResult.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageResult.java @@ -1,4 +1,14 @@ package com.openelements.hiero.base.protocol.data; -public record TopicMessageResult() { +import com.hedera.hashgraph.sdk.Status; +import com.hedera.hashgraph.sdk.SubscriptionHandle; +import com.hedera.hashgraph.sdk.TransactionId; +import org.jspecify.annotations.NonNull; + +import java.util.Objects; + +public record TopicMessageResult(@NonNull SubscriptionHandle subscriptionHandle) { + public TopicMessageResult { + Objects.requireNonNull(subscriptionHandle, "subscriptionHandle must not be null"); + } } diff --git a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/ProtocolLayerClientTests.java b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/ProtocolLayerClientTests.java index 9f42e887..30a0e724 100644 --- a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/ProtocolLayerClientTests.java +++ b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/ProtocolLayerClientTests.java @@ -11,6 +11,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Set; + public class ProtocolLayerClientTests { @Test @@ -31,6 +33,11 @@ void testNullParams() { public @NonNull Client getClient() { return null; } + + @Override + public @NonNull Set getMirrorNodeEndPoint() { + return null; + } }; final ProtocolLayerClient client = new ProtocolLayerClientImpl(context); diff --git a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java index bc94c452..784af499 100644 --- a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java +++ b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java @@ -1,7 +1,9 @@ package com.openelements.hiero.base.test; import com.hedera.hashgraph.sdk.PrivateKey; +import com.hedera.hashgraph.sdk.SubscriptionHandle; import com.hedera.hashgraph.sdk.TopicId; +import com.hedera.hashgraph.sdk.TopicMessage; import com.openelements.hiero.base.HieroException; import com.openelements.hiero.base.data.Account; import com.openelements.hiero.base.implementation.TopicClientImpl; @@ -14,12 +16,17 @@ import com.openelements.hiero.base.protocol.data.TopicDeleteResult; import com.openelements.hiero.base.protocol.data.TopicSubmitMessageRequest; import com.openelements.hiero.base.protocol.data.TopicSubmitMessageResult; +import com.openelements.hiero.base.protocol.data.TopicMessageRequest; +import com.openelements.hiero.base.protocol.data.TopicMessageResult; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import java.time.Instant; +import java.util.function.Consumer; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -33,6 +40,7 @@ public class TopicClientImplTest { ArgumentCaptor topicUpdateCaptor = ArgumentCaptor.forClass(TopicUpdateRequest.class); ArgumentCaptor topicDeleteCaptor = ArgumentCaptor.forClass(TopicDeleteRequest.class); ArgumentCaptor topicSubmitCaptor = ArgumentCaptor.forClass(TopicSubmitMessageRequest.class); + ArgumentCaptor topicSubscribeCaptor = ArgumentCaptor.forClass(TopicMessageRequest.class); @BeforeEach void setup() { @@ -587,4 +595,180 @@ void shouldThrowExceptionForNullParamOnSubmitMessage() { Assertions.assertThrows(NullPointerException.class, () -> topicClient.submitMessage((TopicId) null, (String)null)); Assertions.assertThrows(NullPointerException.class, () -> topicClient.submitMessage((TopicId) null, null, (String)null)); } + + @Test + void shouldSubscribeTopic() throws HieroException { + final TopicMessageResult topicMessageResult = Mockito.mock(TopicMessageResult.class); + final SubscriptionHandle subscriptionHandle = Mockito.mock(SubscriptionHandle.class); + + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final Consumer subscription = (message) -> {}; + + when(protocolLayerClient.executeTopicMessageQuery(any(TopicMessageRequest.class))) + .thenReturn(topicMessageResult); + when(topicMessageResult.subscriptionHandle()).thenReturn(subscriptionHandle); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, subscription); + + verify(protocolLayerClient, times(1)) + .executeTopicMessageQuery(topicSubscribeCaptor.capture()); + final TopicMessageRequest capture = topicSubscribeCaptor.getValue(); + Assertions.assertEquals(topicId, capture.topicId()); + Assertions.assertEquals(subscription, capture.subscription()); + Assertions.assertEquals(-1, capture.limit()); // default limit infinite(-1) + Assertions.assertNull(capture.startTime()); + Assertions.assertNull(capture.endTime()); + + verify(topicMessageResult, times(1)).subscriptionHandle(); + + Assertions.assertNotNull(handler); + Assertions.assertEquals(subscriptionHandle, handler); + } + + @Test + void shouldSubscribeTopicWithLimit() throws HieroException { + final TopicMessageResult topicMessageResult = Mockito.mock(TopicMessageResult.class); + final SubscriptionHandle subscriptionHandle = Mockito.mock(SubscriptionHandle.class); + + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final int limit = 2; + final Consumer subscription = (message) -> {}; + + when(protocolLayerClient.executeTopicMessageQuery(any(TopicMessageRequest.class))) + .thenReturn(topicMessageResult); + when(topicMessageResult.subscriptionHandle()).thenReturn(subscriptionHandle); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, subscription, limit); + + verify(protocolLayerClient, times(1)) + .executeTopicMessageQuery(topicSubscribeCaptor.capture()); + final TopicMessageRequest capture = topicSubscribeCaptor.getValue(); + Assertions.assertEquals(topicId, capture.topicId()); + Assertions.assertEquals(subscription, capture.subscription()); + Assertions.assertEquals(limit, capture.limit()); + Assertions.assertNull(capture.startTime()); + Assertions.assertNull(capture.endTime()); + + verify(topicMessageResult, times(1)).subscriptionHandle(); + + Assertions.assertNotNull(handler); + Assertions.assertEquals(subscriptionHandle, handler); + } + + @Test + void shouldSubscribeTopicWithStartAndEndTime() throws HieroException { + final TopicMessageResult topicMessageResult = Mockito.mock(TopicMessageResult.class); + final SubscriptionHandle subscriptionHandle = Mockito.mock(SubscriptionHandle.class); + + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final Consumer subscription = (message) -> {}; + final Instant startTime = Instant.now().plusSeconds(120); + final Instant endTime = Instant.now().plusSeconds(1800); + + when(protocolLayerClient.executeTopicMessageQuery(any(TopicMessageRequest.class))) + .thenReturn(topicMessageResult); + when(topicMessageResult.subscriptionHandle()).thenReturn(subscriptionHandle); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, subscription, startTime, endTime); + + verify(protocolLayerClient, times(1)) + .executeTopicMessageQuery(topicSubscribeCaptor.capture()); + final TopicMessageRequest capture = topicSubscribeCaptor.getValue(); + Assertions.assertEquals(topicId, capture.topicId()); + Assertions.assertEquals(subscription, capture.subscription()); + Assertions.assertEquals(-1, capture.limit()); // default limit + Assertions.assertEquals(startTime, capture.startTime()); + Assertions.assertEquals(endTime, capture.endTime()); + + verify(topicMessageResult, times(1)).subscriptionHandle(); + + Assertions.assertNotNull(handler); + Assertions.assertEquals(subscriptionHandle, handler); + } + + @Test + void shouldSubscribeTopicWithAllParams() throws HieroException { + final TopicMessageResult topicMessageResult = Mockito.mock(TopicMessageResult.class); + final SubscriptionHandle subscriptionHandle = Mockito.mock(SubscriptionHandle.class); + + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final Consumer subscription = (message) -> {}; + final Instant startTime = Instant.now().plusSeconds(120); + final Instant endTime = Instant.now().plusSeconds(1800); + final int limit = 1; + + when(protocolLayerClient.executeTopicMessageQuery(any(TopicMessageRequest.class))) + .thenReturn(topicMessageResult); + when(topicMessageResult.subscriptionHandle()).thenReturn(subscriptionHandle); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, subscription, startTime, endTime, limit); + + verify(protocolLayerClient, times(1)) + .executeTopicMessageQuery(topicSubscribeCaptor.capture()); + final TopicMessageRequest capture = topicSubscribeCaptor.getValue(); + Assertions.assertEquals(topicId, capture.topicId()); + Assertions.assertEquals(subscription, capture.subscription()); + Assertions.assertEquals(limit, capture.limit()); + Assertions.assertEquals(startTime, capture.startTime()); + Assertions.assertEquals(endTime, capture.endTime()); + + verify(topicMessageResult, times(1)).subscriptionHandle(); + + Assertions.assertNotNull(handler); + Assertions.assertEquals(subscriptionHandle, handler); + } + + @Test + void shouldThrowExceptionOnSubscribeTopicWithInvalidStartAndEndTime() { + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final Consumer subscription = (message) -> {}; + + final Instant startTime1 = Instant.now().plusSeconds(120); + final Instant endTime1 = startTime1.minusSeconds(60); + final Instant startTime2 = Instant.now().minusSeconds(60); + final Instant endTime2 = Instant.now().plusSeconds(120); + final int limit = 1; + + //End time before start time + final IllegalArgumentException e1 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, startTime1, endTime1)); + final IllegalArgumentException e2 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, startTime1, endTime1, limit)); + + Assertions.assertEquals("endTime must be greater than startTime", e1.getMessage()); + Assertions.assertEquals("endTime must be greater than startTime", e2.getMessage()); + + //Start time before current time + final IllegalArgumentException e3 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, startTime2, endTime2)); + final IllegalArgumentException e4 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, startTime2, endTime2, limit)); + + Assertions.assertEquals("startTime must be greater than currentTime", e3.getMessage()); + Assertions.assertEquals("startTime must be greater than currentTime", e4.getMessage()); + } + + @Test + void shouldThrowExceptionOnSubscribeTopicWithLimitEqualsZero() { + final String msg = "limit must be greater than 0"; + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final Consumer subscription = (message) -> {}; + final Instant startTime = Instant.now().plusSeconds(120); + final Instant endTime = startTime.plusSeconds(120); + final int limit = 0; + + final IllegalArgumentException e1 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, limit)); + final IllegalArgumentException e2 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, startTime, endTime, limit)); + + Assertions.assertEquals(msg, e1.getMessage()); + Assertions.assertEquals(msg, e2.getMessage()); + } } diff --git a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java index 05a844c4..b53e2481 100644 --- a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java +++ b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java @@ -11,6 +11,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; + import org.jspecify.annotations.NonNull; import org.slf4j.Logger; @@ -22,6 +24,8 @@ public class HieroTestContext implements HieroContext { private final Client client; + private final Set mirronNodeEndpoint; + public HieroTestContext() { final Dotenv dotenv = Dotenv.configure().ignoreIfMissing().load(); @@ -57,6 +61,7 @@ public HieroTestContext() { final Map nodes = new HashMap<>(); networkSettings.getConsensusNodes() .forEach(consensusNode -> nodes.put(consensusNode.getAddress(), consensusNode.getAccountId())); + mirronNodeEndpoint = networkSettings.getConsensusServiceAddress(); client = Client.forNetwork(nodes); if (!networkSettings.getMirrorNodeAddresses().isEmpty()) { try { @@ -76,4 +81,9 @@ public HieroTestContext() { public Client getClient() { return client; } + + @Override + public @NonNull Set getMirrorNodeEndPoint() { + return mirronNodeEndpoint; + } } diff --git a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/SoloActionNetworkSettings.java b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/SoloActionNetworkSettings.java index f63b7384..139ca643 100644 --- a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/SoloActionNetworkSettings.java +++ b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/SoloActionNetworkSettings.java @@ -24,6 +24,9 @@ public class SoloActionNetworkSettings implements NetworkSettings { return Set.of("http://localhost:8080"); } + @Override + public @NonNull Set getConsensusServiceAddress() {return Set.of("http://localhost:8080");} + @Override public @NonNull Set getConsensusNodes() { return Set.of(new ConsensusNode("127.0.0.1", "50211", "0.0.3")); diff --git a/hiero-enterprise-microprofile/src/main/java/com/openelements/hiero/microprofile/implementation/HieroConfigImpl.java b/hiero-enterprise-microprofile/src/main/java/com/openelements/hiero/microprofile/implementation/HieroConfigImpl.java index 0141ed19..6415079a 100644 --- a/hiero-enterprise-microprofile/src/main/java/com/openelements/hiero/microprofile/implementation/HieroConfigImpl.java +++ b/hiero-enterprise-microprofile/src/main/java/com/openelements/hiero/microprofile/implementation/HieroConfigImpl.java @@ -27,6 +27,8 @@ public class HieroConfigImpl implements HieroConfig { private final Set mirrorNodeAddresses; + private final Set consensusServiceAddress; + private final Set consensusNodes; private final Long chainId; @@ -51,12 +53,14 @@ public HieroConfigImpl(@NonNull final HieroOperatorConfiguration configuration, final NetworkSettings settings = networkSettings.get(); networkName = settings.getNetworkName().orElse(networkConfiguration.getName().orElse(null)); mirrorNodeAddresses = Collections.unmodifiableSet(settings.getMirrorNodeAddresses()); + consensusServiceAddress = Collections.unmodifiableSet(settings.getConsensusServiceAddress()); consensusNodes = Collections.unmodifiableSet(settings.getConsensusNodes()); chainId = settings.chainId().orElse(null); relayUrl = settings.relayUrl().orElse(null); } else { networkName = networkConfiguration.getName().orElse(null); mirrorNodeAddresses = networkConfiguration.getMirrornode().map(Set::of).orElse(Set.of()); + consensusServiceAddress = Set.of(); consensusNodes = Collections.unmodifiableSet(networkConfiguration.getNodes()); chainId = null; relayUrl = null; @@ -83,6 +87,11 @@ public Optional getRequestTimeout() { return mirrorNodeAddresses; } + @Override + public @NonNull Set getConsensusServiceAddress() { + return consensusServiceAddress; + } + @Override public @NonNull Set getConsensusNodes() { return consensusNodes; diff --git a/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroAutoConfiguration.java b/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroAutoConfiguration.java index 35879b71..c1745ca4 100644 --- a/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroAutoConfiguration.java +++ b/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroAutoConfiguration.java @@ -34,6 +34,8 @@ import java.net.URI; import java.net.URL; import java.util.List; +import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -110,7 +112,7 @@ TopicClient topicClient(final ProtocolLayerClient protocolLayerClient, HieroCont havingValue = "true", matchIfMissing = true) MirrorNodeClient mirrorNodeClient(final HieroContext hieroContext) { final String mirrorNodeEndpoint; - final List mirrorNetwork = hieroContext.getClient().getMirrorNetwork(); + final List mirrorNetwork = hieroContext.getMirrorNodeEndPoint().stream().toList(); if (mirrorNetwork.isEmpty()) { throw new IllegalArgumentException("Mirror node endpoint must be set"); } diff --git a/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroConfigImpl.java b/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroConfigImpl.java index c3954510..60f21e9c 100644 --- a/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroConfigImpl.java +++ b/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroConfigImpl.java @@ -23,6 +23,8 @@ public class HieroConfigImpl implements HieroConfig { private final Set mirrorNodeAddresses; + private final Set consensusServiceAddress; + private final Set consensusNodes; private final Long chainId; @@ -50,6 +52,7 @@ public HieroConfigImpl(@NonNull final HieroProperties properties) { final NetworkSettings settings = networkSettings.get(); networkName = settings.getNetworkName().orElse(properties.getNetwork().getName()); mirrorNodeAddresses = Collections.unmodifiableSet(settings.getMirrorNodeAddresses()); + consensusServiceAddress = Collections.unmodifiableSet(settings.getConsensusServiceAddress()); consensusNodes = Collections.unmodifiableSet(settings.getConsensusNodes()); chainId = settings.chainId().orElse(null); relayUrl = settings.relayUrl().orElse(null); @@ -61,6 +64,7 @@ public HieroConfigImpl(@NonNull final HieroProperties properties) { } else { mirrorNodeAddresses = Set.of(); } + consensusServiceAddress = Set.of(); final List nodes = properties.getNetwork().getNodes(); if (nodes == null || nodes.isEmpty()) { consensusNodes = Set.of(); @@ -106,6 +110,11 @@ public Set getMirrorNodeAddresses() { return mirrorNodeAddresses; } + @Override + public @NonNull Set getConsensusServiceAddress() { + return consensusServiceAddress; + } + @Override public Set getConsensusNodes() { return consensusNodes; diff --git a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java index 753a99c3..83234e91 100644 --- a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java +++ b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java @@ -1,19 +1,29 @@ package com.openelements.hiero.spring.test; import com.hedera.hashgraph.sdk.PrivateKey; +import com.hedera.hashgraph.sdk.SubscriptionHandle; import com.hedera.hashgraph.sdk.TopicId; import com.openelements.hiero.base.HieroException; import com.openelements.hiero.base.TopicClient; +import com.openelements.hiero.test.HieroTestUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + @SpringBootTest(classes = HieroTestConfig.class) public class TopicClientTest { @Autowired private TopicClient topicClient; + @Autowired + private HieroTestUtils hieroTestUtils; + @Test void testCreateTopic() throws HieroException { final TopicId topicId = topicClient.createTopic(); @@ -198,4 +208,107 @@ void testSubmitMessageThrowExceptionFroInvalidId() { final PrivateKey submitKey = PrivateKey.generateECDSA(); Assertions.assertThrows(HieroException.class, () -> topicClient.submitMessage(topicId, submitKey, message)); } + + @Test + void testSubscribeTopic() throws Exception { + final String msg = "Hello Hiero"; + final List messages = new ArrayList<>(); + final TopicId topicId = topicClient.createTopic(); + hieroTestUtils.waitForMirrorNodeRecords(); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, (message) -> { + messages.add(new String(message.contents)); + }); + + topicClient.submitMessage(topicId, msg); + hieroTestUtils.waitForMirrorNodeRecords(); + Thread.sleep(5000); // Make sure to wait after message get recorded in mirrornode + + Assertions.assertNotNull(handler); + Assertions.assertEquals(1, messages.size()); + Assertions.assertEquals(msg,messages.getFirst()); + handler.unsubscribe(); + } + + @Test + void testSubscribeTopicWithLimit() throws Exception { + final String msg = "Hello Hiero"; + final long limit = 1; + + final List messages = new ArrayList<>(); + final TopicId topicId = topicClient.createTopic(); + hieroTestUtils.waitForMirrorNodeRecords(); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, (message) -> { + messages.add(new String(message.contents)); + }, limit); + + topicClient.submitMessage(topicId, msg); + hieroTestUtils.waitForMirrorNodeRecords(); + Thread.sleep(5000); // Make sure to wait after message get recorded in mirrornode + + topicClient.submitMessage(topicId, msg); + hieroTestUtils.waitForMirrorNodeRecords(); + Thread.sleep(5000); // Make sure to wait after message get recorded in mirrornode + + Assertions.assertNotNull(handler); + Assertions.assertEquals(limit, messages.size()); + handler.unsubscribe(); + } + + @Test + void testSubscribeTopicWithInvalidLimit() throws HieroException { + final String msg = "limit must be greater than 0"; + final long limit = 0; + + final TopicId topicId = topicClient.createTopic(); + hieroTestUtils.waitForMirrorNodeRecords(); + + final IllegalArgumentException e = Assertions.assertThrows( + IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, (message) -> {/**/}, limit) + ); + + Assertions.assertEquals(msg, e.getMessage()); + } + + @Test + void testSubscribeTopicWithStartAndEndTime() throws HieroException { + final TopicId topicId = topicClient.createTopic(); + hieroTestUtils.waitForMirrorNodeRecords(); + final Instant start = Instant.now().plus(Duration.ofMinutes(10)); + final Instant end = Instant.now().plus(Duration.ofDays(2)); + final SubscriptionHandle handler = Assertions.assertDoesNotThrow( + () -> topicClient.subscribeTopic(topicId, (message) -> {}, start, end) + ); + + Assertions.assertNotNull(handler); + handler.unsubscribe(); + } + + @Test + void testSubscribeTopicWithStartAndEndTimeWithInvalidParams() throws HieroException { + final TopicId topicId = topicClient.createTopic(); + // Start time before Current time + final Instant invalidStart = Instant.now().minus(Duration.ofMinutes(10)); + final Instant end = Instant.now().plus(Duration.ofDays(2)); + + final IllegalArgumentException e1 = Assertions.assertThrows( + IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, (message) -> {}, invalidStart, end) + ); + + Assertions.assertEquals("startTime must be greater than currentTime", e1.getMessage()); + + // End time before than Start time + final Instant start = Instant.now().plus(Duration.ofMinutes(10)); + final Instant invalidEnd = start.minus(Duration.ofMinutes(1)); + + final IllegalArgumentException e2 = Assertions.assertThrows( + IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, (message) -> {}, start, invalidEnd) + ); + + Assertions.assertEquals("endTime must be greater than startTime", e2.getMessage()); + } } diff --git a/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java b/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java index 293eed4a..6cdfa64d 100644 --- a/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java +++ b/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java @@ -24,6 +24,9 @@ public class SoloActionNetworkSettings implements NetworkSettings { return Set.of("http://localhost:5551"); } + @Override + public @NonNull Set getConsensusServiceAddress() {return Set.of("localhost:5600");} + @Override public @NonNull Set getConsensusNodes() { return Set.of(new ConsensusNode("127.0.0.1", "50211", "0.0.3"));