Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,12 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str

protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean authoritative,
Map<String, String> properties) {
CompletableFuture<Void> ret = validateNonPartitionTopicNameAsync(topicName.getLocalName());
CompletableFuture<Void> ret = namespaceResources().namespaceExistsAsync(namespaceName)
.thenAccept(exists -> {
if (!exists) {
throw new RestException(Status.NOT_FOUND, "V1 namespace [" + namespaceName + "] does not exist");
}
}).thenCompose(__ -> validateNonPartitionTopicNameAsync(topicName.getLocalName()));
if (topicName.isGlobal()) {
ret = ret.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void getTopics(@Suspended AsyncResponse response,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
validateNamespaceName(property, cluster, namespace);
validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS)
validateNamespaceOperationAsync(NamespaceName.get(property, cluster, namespace), NamespaceOperation.GET_TOPICS)
// Validate that namespace exists, throws 404 if it doesn't exist
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies -> internalGetListOfTopics(policies, mode))
Expand Down Expand Up @@ -292,7 +292,8 @@ public void getPermissions(@Suspended AsyncResponse response,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION)
validateNamespaceOperationAsync(NamespaceName.get(property, cluster, namespace),
NamespaceOperation.GET_PERMISSION)
.thenCompose(__ -> getAuthorizationService().getPermissionsAsync(namespaceName))
.thenAccept(permissions -> response.resume(permissions))
.exceptionally(ex -> {
Expand Down Expand Up @@ -823,7 +824,7 @@ public void getBundlesData(@Suspended final AsyncResponse asyncResponse,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
validatePoliciesReadOnlyAccessAsync()
.thenCompose(__ -> validateNamespaceOperationAsync(NamespaceName.get(property, namespace),
.thenCompose(__ -> validateNamespaceOperationAsync(NamespaceName.get(property, cluster, namespace),
NamespaceOperation.GET_BUNDLE))
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.bundles))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,12 @@ && pulsar().getBrokerService().isAuthorizationEnabled()) {
}
});
}
return CompletableFuture.completedFuture(null);
return namespaceResources().namespaceExistsAsync(namespaceName)
.thenAccept(exists -> {
if (!exists) {
throw new RestException(Status.NOT_FOUND, "Namespace [" + namespaceName + "] does not exist");
}
});
}

public void validateNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,26 @@ public void cleanup() throws Exception {
conf.setClusterName(configClusterName);
}

private void createNamespaceIfAbsent(TopicName topicName) throws Exception {
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
NamespaceName namespaceName = topicName.getNamespaceObject();
if (!namespaceName.isV2()) {
if (!admin.clusters().getClusters().contains(namespaceName.getCluster())) {
admin.clusters().createCluster(namespaceName.getCluster(), ClusterData.builder()
.brokerServiceUrl(pulsar.getBrokerServiceUrl()).build());
}
tenantInfo.getAllowedClusters().add(namespaceName.getCluster());
}
if (!admin.tenants().getTenants().contains(topicName.getTenant())) {
admin.tenants().createTenant(topicName.getTenant(), tenantInfo);
}
try {
admin.namespaces().createNamespace(topicName.getNamespace());
} catch (Exception ex) {
// Namespace may already exist.
}
}

@Test
public void internalConfiguration() throws Exception {
ServiceConfiguration conf = pulsar.getConfiguration();
Expand Down Expand Up @@ -948,7 +968,7 @@ public void testUpdatePartitionedTopicCoontainedInOldTopic() throws Exception {
@Test
public void test500Error() throws Exception {
final String property = "prop-xyz";
final String cluster = "use";
final String cluster = pulsar.getConfig().getClusterName();
final String namespace = "ns";
final String partitionedTopicName = "error-500-topic";
AsyncResponse response1 = mock(AsyncResponse.class);
Expand All @@ -958,6 +978,7 @@ public void test500Error() throws Exception {
NamespaceService namespaceService = pulsar.getNamespaceService();

doReturn(future).when(namespaceService).checkTopicExists(any());
createNamespaceIfAbsent(TopicName.get("persistent", property, cluster, namespace, partitionedTopicName));
persistentTopics.createPartitionedTopic(response1, property, cluster, namespace,
partitionedTopicName, 5, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class AdminTopicApiTest extends ProducerConsumerBase {
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
admin.namespaces().createNamespace("my-property/test/my-ns");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
Expand Down Expand Up @@ -635,4 +636,37 @@ public void testTenantNotExist(TopicDomain topicDomain) throws Exception {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});
}

@Test
public void testTopicNameContainsSpecialCharacters() throws Exception {
// V1 topic create failed because v1 namespace does not exist.
String topic = "persistent://public/default/tp/h";
try {
admin1.topics().createNonPartitionedTopic(topic);
fail("Expected a namespace not found ex, since the v1 namespace does not exist");
} catch (PulsarAdminException.NotFoundException ex) {
assertTrue(ex.getMessage().contains("does not exist"));
}
try {
admin1.topics().createPartitionedTopic(topic, 1);
fail("Expected a namespace not found ex, since the v1 namespace does not exist");
} catch (PulsarAdminException.NotFoundException ex) {
assertTrue(ex.getMessage().contains("does not exist"));
}

// The topic can be created after v1 namespace was created
String v1Ns = "public/" + pulsar1.getConfig().getClusterName() + "/default";
admin1.namespaces().createNamespace(v1Ns);
String topic2 = "persistent://" + v1Ns + "/h";
String topic3 = "persistent://" + v1Ns + "/h2";
admin1.topics().createNonPartitionedTopic(topic2);
admin1.topics().createPartitionedTopic(topic3, 1);
List<String> v1Topics = admin1.topics().getList(v1Ns);
assertTrue(v1Topics.contains(topic2));
assertTrue(v1Topics.contains(topic3 + "-partition-0"));

// cleanup.
admin1.topics().delete(topic2, false);
admin1.topics().deletePartitionedTopic(topic3, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,26 @@ protected void setup() throws Exception {
admin.namespaces().createNamespace(testTenant + "/" + testNamespaceLocal);
}

private void createNamespaceIfAbsent(TopicName topicName) throws Exception {
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
NamespaceName namespaceName = topicName.getNamespaceObject();
if (!namespaceName.isV2()) {
if (!admin.clusters().getClusters().contains(namespaceName.getCluster())) {
admin.clusters().createCluster(namespaceName.getCluster(), ClusterData.builder()
.brokerServiceUrl(pulsar.getBrokerServiceUrl()).build());
}
tenantInfo.getAllowedClusters().add(namespaceName.getCluster());
}
if (!admin.tenants().getTenants().contains(topicName.getTenant())) {
admin.tenants().createTenant(topicName.getTenant(), tenantInfo);
}
try {
admin.namespaces().createNamespace(topicName.getNamespace());
} catch (Exception ex) {
// Namespace may already exist.
}
}

@Override
@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
Expand Down Expand Up @@ -715,15 +735,19 @@ public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix
final String partitionedTopicName = "special-topic";
pulsar.getDefaultManagedLedgerFactory()
.open(TopicName.get(nonPartitionTopicName2).getPersistenceNamingEncoding());
final TopicName topicName = TopicName.get("persistent", testTenant,
pulsar.getConfig().getClusterName(), testNamespace, partitionedTopicName);
createNamespaceIfAbsent(topicName);
doAnswer(invocation -> {
persistentTopics.namespaceName = NamespaceName.get("tenant", "namespace");
persistentTopics.topicName = TopicName.get("persistent", "tenant", "cluster", "namespace", "topicname");
persistentTopics.namespaceName = NamespaceName.get(testTenant, testNamespace);
persistentTopics.topicName = topicName;
return null;
}).when(persistentTopics).validatePartitionedTopicName(any(), any(), any());

doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);

persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand All @@ -70,6 +73,26 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

private void createNamespaceIfAbsent(TopicName topicName) throws Exception {
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
NamespaceName namespaceName = topicName.getNamespaceObject();
if (!namespaceName.isV2()) {
if (!admin.clusters().getClusters().contains(namespaceName.getCluster())) {
admin.clusters().createCluster(namespaceName.getCluster(), ClusterData.builder()
.brokerServiceUrl(pulsar.getBrokerServiceUrl()).build());
}
tenantInfo.getAllowedClusters().add(namespaceName.getCluster());
}
if (!admin.tenants().getTenants().contains(topicName.getTenant())) {
admin.tenants().createTenant(topicName.getTenant(), tenantInfo);
}
try {
admin.namespaces().createNamespace(topicName.getNamespace());
} catch (Exception ex) {
// Namespace may already exist.
}
}

private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 100;

private static class TestConsumerStateEventListener implements ConsumerEventListener {
Expand Down Expand Up @@ -169,7 +192,9 @@ FailoverConsumer createConsumer(String topicName, String subName, String listene

@Test
public void testSimpleConsumerEventsWithoutPartition() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/failover-topic1-" + System.currentTimeMillis();
final String topicName = "persistent://prop/" + pulsar.getConfig().getClusterName()
+ "/ns-abc/failover-topic1-" + System.currentTimeMillis();
createNamespaceIfAbsent(TopicName.get(topicName));
final String subName = "sub1";
final int numMsgs = 100;

Expand Down Expand Up @@ -314,7 +339,9 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
int numPartitions = 4;

final String topicName = BrokerTestUtil.newUniqueName(
"persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition");
"persistent://prop/" + pulsar.getConfig().getClusterName()
+ "/ns-abc/testSimpleConsumerEventsWithPartition");
createNamespaceIfAbsent(TopicName.get(topicName));
final TopicName destName = TopicName.get(topicName);
final String subName = "sub1";
final int numMsgs = 100;
Expand Down Expand Up @@ -501,7 +528,8 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {

@Test
public void testActiveConsumerFailoverWithDelay() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/failover-topic3";
final String topicName = "persistent://prop/" + pulsar.getConfig().getClusterName() + "/ns-abc/failover-topic3";
createNamespaceIfAbsent(TopicName.get(topicName));
final String subName = "sub1";
final int numMsgs = 100;
List<Message<byte[]>> receivedMessages = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.tests.EnumValuesDataProvider;
import org.slf4j.Logger;
Expand All @@ -58,11 +59,28 @@ protected void cleanup() throws Exception {
internalCleanup();
}

private void createNamespaceIfAbsent(TopicName topicName) throws Exception {
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
NamespaceName namespaceName = topicName.getNamespaceObject();
if (!namespaceName.isV2()) {
tenantInfo.getAllowedClusters().add(namespaceName.getCluster());
}
if (!admin.tenants().getTenants().contains(topicName.getTenant())) {
admin.tenants().createTenant(topicName.getTenant(), tenantInfo);
}
try {
admin.namespaces().createNamespace(topicName.getNamespace());
} catch (Exception ex) {
// Namespace may already exist.
}
}

@Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class, dataProvider = "values")
public void producerSendAsync(TopicType topicType) throws PulsarClientException, PulsarAdminException {
public void producerSendAsync(TopicType topicType) throws Exception {
// Given
String key = "producerSendAsync-" + topicType;
final String topicName = "persistent://prop/cluster/namespace/topic-" + key;
final String topicName = "persistent://prop/" + pulsar.getConfig().getClusterName() + "/namespace/topic-" + key;
createNamespaceIfAbsent(TopicName.get(topicName));
final String subscriptionName = "my-subscription-" + key;
final String messagePrefix = "my-message-" + key + "-";
final int numberOfMessages = 30;
Expand Down Expand Up @@ -126,15 +144,17 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException,
}

@Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class, dataProvider = "values")
public void producerSend(TopicType topicType) throws PulsarClientException, PulsarAdminException {
public void producerSend(TopicType topicType) throws Exception {
// Given
String key = "producerSend-" + topicType;
final String topicName = "persistent://prop/cluster/namespace/topic-" + key;
final String topicName = "persistent://prop/" + pulsar.getConfig().getClusterName() + "/namespace/topic-" + key;
createNamespaceIfAbsent(TopicName.get(topicName));
final String subscriptionName = "my-subscription-" + key;
final String messagePrefix = "my-message-" + key + "-";
final int numberOfMessages = 30;
if (topicType == TopicType.PARTITIONED) {
int numberOfPartitions = 7;
createNamespaceIfAbsent(TopicName.get(topicName));
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -66,6 +69,15 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Override
public void setup() throws Exception {
baseSetup();
admin.clusters().createCluster("use",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Collections.emptySet(), Set.of("use"));
admin.tenants().createTenant("prop-xyz", tenantInfo);
TenantInfo tenantInfo1 = admin.tenants().getTenantInfo("prop");
tenantInfo1.getAllowedClusters().add("use");
admin.tenants().updateTenant("prop", tenantInfo1);
admin.namespaces().createNamespace("prop-xyz/use/ns-abc");
admin.namespaces().createNamespace("prop/use/ns-abc");
}

@AfterClass(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,8 @@ public void testDeleteTopicAndSchemaForV1() throws Exception {
final String topicOne = "not-partitioned-topic";
final String topic2 = "persistent://" + tenant + "/" + cluster + "/" + namespace + "/partitioned-topic";

admin.namespaces().createNamespace(tenant + "/" + cluster + "/" + namespace);

// persistent, non-partitioned v1/topic
final String topic1 = TopicName.get(
TopicDomain.persistent.value(),
Expand Down
Loading
Loading