Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package io.github.protocol.pulsar.admin.jdk;

import com.fasterxml.jackson.core.type.TypeReference;
import io.github.openfacade.http.HttpResponse;
import io.github.protocol.pulsar.admin.common.JacksonService;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class PersistentTopics extends BaseTopicsImpl {

private static final String BASE_URL_PERSISTENT_DOMAIN = "/admin/v2" + "/persistent";
Expand All @@ -11,4 +19,58 @@ public PersistentTopics(InnerHttpClient httpClient) {
public String getDomainBaseUrl() {
return BASE_URL_PERSISTENT_DOMAIN;
}

public void createSubscription(String tenant, String namespace, String encodedTopic, String subscriptionName,
boolean replicated, boolean authoritative, SubscriptionMessageId messageId)
throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s/subscription/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
subscriptionName);
try {
HttpResponse response =
httpClient.put(url, messageId, "replicated", String.valueOf(replicated), "authoritative",
String.valueOf(authoritative));
if (response.statusCode() != 204) {
throw new PulsarAdminException(
String.format("failed to create subscription %s for topic %s/%s/%s, status code %s, body : %s",
subscriptionName, tenant, namespace, encodedTopic, response.statusCode(),
response.bodyAsString()));
}
} catch (IOException | InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
}
}

public void deleteSubscription(String tenant, String namespace, String encodedTopic, String subName, boolean force,
boolean authoritative) throws PulsarAdminException {
String url =
String.format("%s/%s/%s/%s/subscription/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, subName);
try {
HttpResponse response =
httpClient.delete(url, "force", String.valueOf(force), "authoritative", String.valueOf(authoritative));
if (response.statusCode() != 204) {
throw new PulsarAdminException(
String.format("failed to delete subscription %s of topic %s/%s/%s, status code %s, body : %s",
subName, tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString()));
}
} catch (IOException | InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
}
}

public List<String> getSubscriptions(String tenant, String namespace, String encodedTopic, boolean authoritative)
throws PulsarAdminException {
String url = String.format("%s/%s/%s/%s/subscriptions", getDomainBaseUrl(), tenant, namespace, encodedTopic);
try {
HttpResponse response = httpClient.get(url, "authoritative", String.valueOf(authoritative));
if (response.statusCode() != 200) {
throw new PulsarAdminException(
String.format("failed to get subscriptions of topic %s/%s/%s, status code %s, body : %s", tenant,
namespace, encodedTopic, response.statusCode(), response.bodyAsString()));
}
return JacksonService.toRefer(response.body(), new TypeReference<List<String>>() {
});
} catch (IOException | InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.github.protocol.pulsar.admin.jdk;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.Map;

@Getter
@Setter
@NoArgsConstructor
@EqualsAndHashCode
public class SubscriptionMessageId {

private Integer batchIndex = -1;

private Long entryId = -1L;

private Long ledgerId = -1L;

private Integer partitionIndex = -1;

private Map<String, String> properties = null;

public static SubscriptionMessageId earliest() {
SubscriptionMessageId subscriptionMessageId = new SubscriptionMessageId();
return subscriptionMessageId;
}

public static SubscriptionMessageId latest() {
SubscriptionMessageId subscriptionMessageId = new SubscriptionMessageId();
subscriptionMessageId.setEntryId(Long.MAX_VALUE);
subscriptionMessageId.setLedgerId(Long.MAX_VALUE);
subscriptionMessageId.setPartitionIndex(Integer.MAX_VALUE);
return subscriptionMessageId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.TreeMap;

public class PersistentTopicsTest extends BaseTest {
Expand Down Expand Up @@ -197,4 +198,49 @@ public void getPartitionedStatsTest(PulsarAdmin pulsarAdmin) throws PulsarAdminE
Assertions.assertNotNull(pulsarAdmin.persistentTopics().getPartitionedStats(tenant, namespace, topic, false));
}

@ParameterizedTest
@MethodSource("providePulsarAdmins")
public void subscriptionTest(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
String namespace = RandomUtil.randomString();
String topic = RandomUtil.randomString();
String subscriptionNameLatest = RandomUtil.randomString();
String subscriptionNameEarliest = RandomUtil.randomString();

// Create namespace and topic
pulsarAdmin.namespaces().createNamespace(tenant, namespace);
pulsarAdmin.persistentTopics().createNonPartitionedTopic(tenant, namespace, topic, false, null);

// Create subscription with message ID latest and earliest
pulsarAdmin.persistentTopics()
.createSubscription(tenant, namespace, topic, subscriptionNameLatest, false, false,
SubscriptionMessageId.latest());
pulsarAdmin.persistentTopics()
.createSubscription(tenant, namespace, topic, subscriptionNameEarliest, false, false,
SubscriptionMessageId.earliest());

// Verify subscription was created
List<String> subscriptions = pulsarAdmin.persistentTopics().getSubscriptions(tenant, namespace, topic, false);
Assertions.assertTrue(subscriptions.contains(subscriptionNameEarliest),
"Should contain subscription created with message ID");
Assertions.assertTrue(subscriptions.contains(subscriptionNameLatest),
"Should contain subscription created with message ID");

// test create subscription invalid
Assertions.assertThrows(PulsarAdminException.class,
() -> pulsarAdmin.persistentTopics().createSubscription(tenant, namespace, topic, "", false, false, null));

// test get subscription, topic invalid
Assertions.assertThrows(PulsarAdminException.class,
() -> pulsarAdmin.persistentTopics().getSubscriptions(tenant, namespace, "", false));

// test delete subscription invalid
Assertions.assertThrows(PulsarAdminException.class,
() -> pulsarAdmin.persistentTopics().deleteSubscription(tenant, namespace, topic, "", true, false));

// Clean up
pulsarAdmin.persistentTopics()
.deleteSubscription(tenant, namespace, topic, subscriptionNameEarliest, false, false);
pulsarAdmin.persistentTopics()
.deleteSubscription(tenant, namespace, topic, subscriptionNameLatest, false, false);
}
}