diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java index 7ae9aeac71..b821319cd1 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java @@ -31,7 +31,6 @@ import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils; import java.util.concurrent.TimeUnit; import lombok.Getter; -import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.broker.PulsarService; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -66,7 +65,6 @@ public class KafkaChannelInitializer extends ChannelInitializer { private final SslContextFactory.Server sslContextFactory; @Getter private final RequestStats requestStats; - private final OrderedScheduler sendResponseScheduler; private final LengthFieldPrepender lengthFieldPrepender; @@ -82,7 +80,6 @@ public KafkaChannelInitializer(PulsarService pulsarService, EndPoint advertisedEndPoint, boolean skipMessagesWithoutIndex, RequestStats requestStats, - OrderedScheduler sendResponseScheduler, KafkaTopicManagerSharedState kafkaTopicManagerSharedState) { super(); this.pulsarService = pulsarService; @@ -102,7 +99,6 @@ public KafkaChannelInitializer(PulsarService pulsarService, } else { sslContextFactory = null; } - this.sendResponseScheduler = sendResponseScheduler; this.kafkaTopicManagerSharedState = kafkaTopicManagerSharedState; this.lengthFieldPrepender = new LengthFieldPrepender(4); } @@ -129,7 +125,7 @@ public KafkaRequestHandler newCnx() throws Exception { return new KafkaRequestHandler(pulsarService, kafkaConfig, tenantContextManager, replicaManager, kopBrokerLookupManager, adminManager, producePurgatory, fetchPurgatory, - enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats, sendResponseScheduler, + enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats, kafkaTopicManagerSharedState); } @@ -142,7 +138,6 @@ public KafkaRequestHandler newCnx(final TenantContextManager tenantContextManage producePurgatory, fetchPurgatory, enableTls, advertisedEndPoint, skipMessagesWithoutIndex, new RequestStats(rootStatsLogger.scope(SERVER_SCOPE)), - sendResponseScheduler, kafkaTopicManagerSharedState); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index de5f55bd3f..b021586837 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -34,7 +34,6 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.MathUtils; -import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.protocol.ApiKeys; @@ -60,21 +59,20 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter { protected AtomicBoolean isActive = new AtomicBoolean(false); // Queue to make response get responseFuture in order and limit the max request size private final LinkedBlockingQueue requestQueue; + private int numQueuedRequestsInProgress; + private final int maxQueuedRequests; @Getter @Setter protected volatile RequestStats requestStats; @Getter protected final KafkaServiceConfiguration kafkaConfig; - private final OrderedScheduler sendResponseScheduler; - public KafkaCommandDecoder(RequestStats requestStats, - KafkaServiceConfiguration kafkaConfig, - OrderedScheduler sendResponseScheduler) { + KafkaServiceConfiguration kafkaConfig) { this.requestStats = requestStats; this.kafkaConfig = kafkaConfig; this.requestQueue = new LinkedBlockingQueue<>(kafkaConfig.getMaxQueuedRequests()); - this.sendResponseScheduler = sendResponseScheduler; + this.maxQueuedRequests = kafkaConfig.getMaxQueuedRequests(); } @Override @@ -82,6 +80,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); this.remoteAddress = ctx.channel().remoteAddress(); this.ctx = ctx; + this.numQueuedRequestsInProgress = 0; isActive.set(true); } @@ -230,7 +229,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception final long timeBeforeParse = MathUtils.nowInNano(); KafkaHeaderAndRequest kafkaHeaderAndRequest = byteBufToRequest(buffer, remoteAddress); - // potentially blocking until there is room in the queue for the request. registerRequestParseLatency.accept(timeBeforeParse, null); try { @@ -242,6 +240,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception CompletableFuture responseFuture = new CompletableFuture<>(); final long startProcessRequestTimestamp = MathUtils.nowInNano(); + // this callback is just meant to make sure that we roll through things, but it feels racy responseFuture.whenComplete((response, e) -> { if (e instanceof CancellationException) { if (log.isDebugEnabled()) { @@ -256,13 +255,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception registerRequestLatency.accept(kafkaHeaderAndRequest.getHeader().apiKey(), startProcessRequestTimestamp); - sendResponseScheduler.executeOrdered(channel.remoteAddress().hashCode(), () -> { + ctx.channel().eventLoop().execute(() -> { writeAndFlushResponseToClient(channel); }); }); - // potentially blocking until there is room in the queue for the request. requestQueue.put(ResponseAndRequest.of(responseFuture, kafkaHeaderAndRequest)); + numQueuedRequestsInProgress++; RequestStats.REQUEST_QUEUE_SIZE_INSTANCE.incrementAndGet(); + if (numQueuedRequestsInProgress == maxQueuedRequests) { + channel.config().setAutoRead(false); + } if (!isActive.get()) { handleInactive(kafkaHeaderAndRequest, responseFuture); @@ -397,7 +399,12 @@ protected void writeAndFlushResponseToClient(Channel channel) { } else { if (requestQueue.remove(responseAndRequest)) { RequestStats.REQUEST_QUEUE_SIZE_INSTANCE.decrementAndGet(); - } else { // it has been removed by another thread, skip this element + if (numQueuedRequestsInProgress == maxQueuedRequests) { + channel.config().setAutoRead(true); + } + numQueuedRequestsInProgress--; + } else { + log.error("Request was removed from queue, but that shouldn't be possible."); continue; } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 4db93bb61c..e0f3c0ca40 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -99,7 +99,6 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag @Getter private KopEventManager kopEventManager; - private OrderedScheduler sendResponseScheduler; private NamespaceBundleOwnershipListenerImpl bundleListener; private SchemaRegistryManager schemaRegistryManager; private MigrationManager migrationManager; @@ -168,10 +167,6 @@ public void initialize(ServiceConfiguration conf) throws Exception { statsProvider = new PrometheusMetricsProvider(); StatsLogger rootStatsLogger = statsProvider.getStatsLogger(""); requestStats = new RequestStats(rootStatsLogger.scope(SERVER_SCOPE)); - sendResponseScheduler = OrderedScheduler.newSchedulerBuilder() - .name("send-response") - .numThreads(kafkaConfig.getNumSendKafkaResponseThreads()) - .build(); } // This method is called after initialize @@ -401,7 +396,6 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi endPoint, kafkaConfig.isSkipMessagesWithoutIndex(), requestStats, - sendResponseScheduler, kafkaTopicManagerSharedState); } @@ -482,7 +476,6 @@ public void close() { kafkaTopicManagerSharedState.close(); kopBrokerLookupManager.close(); statsProvider.stop(); - sendResponseScheduler.shutdown(); } @VisibleForTesting diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index f993f78418..c2896f33be 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -80,7 +80,6 @@ import java.util.stream.Stream; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; @@ -312,9 +311,8 @@ public KafkaRequestHandler(PulsarService pulsarService, EndPoint advertisedEndPoint, boolean skipMessagesWithoutIndex, RequestStats requestStats, - OrderedScheduler sendResponseScheduler, KafkaTopicManagerSharedState kafkaTopicManagerSharedState) throws Exception { - super(requestStats, kafkaConfig, sendResponseScheduler); + super(requestStats, kafkaConfig); this.pulsarService = pulsarService; this.tenantContextManager = tenantContextManager; this.replicaManager = replicaManager; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 8ea2631ff7..876192f080 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -73,11 +73,6 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { // // --- Kafka on Pulsar Broker configuration --- // - @FieldContext( - category = CATEGORY_KOP, - doc = "The number of threads used to respond to the response." - ) - private int numSendKafkaResponseThreads = 4; @FieldContext( required = true,