Skip to content

Commit b6faf38

Browse files
authored
bump dependencies: kafka and zookeeper (#378)
- 'kafka_2.12', version: '2.4.0' -> 'kafka_2.12', version: '2.8.2' - 'kafka-clients', version: '2.3.1' -> 'kafka-clients', version: '2.8.2' - zookeeper 3.5.6 -> 3.8.0 ## Details new `NetworkClient` arguments: 1. `long connectionSetupTimeoutMs`, 2. `long connectionSetupTimeoutMaxMs` are described in https://issues.apache.org/jira/browse/KAFKA-9893 and corresponding PRs: 1. apache/kafka#8544 2. apache/kafka#8683 ## Testing Done 1. ./gradlew build
1 parent a97a60b commit b6faf38

File tree

2 files changed

+15
-7
lines changed

2 files changed

+15
-7
lines changed

build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ allprojects {
4141
compile 'net.savantly:graphite-client:1.1.0-RELEASE'
4242
compile 'com.timgroup:java-statsd-client:3.0.1'
4343
compile 'com.signalfx.public:signalfx-codahale:0.0.47'
44-
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0'
45-
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1'
44+
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.8.2'
45+
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.2'
4646
compile 'org.apache.commons:commons-lang3:3.12.0'
4747
compile 'com.linkedin.avroutil1:helper-all:0.2.81'
48+
compile 'org.apache.zookeeper:zookeeper:3.8.0'
4849
testCompile 'org.mockito:mockito-core:2.24.0'
4950
testCompile 'org.testng:testng:6.8.8'
5051
}

src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,30 +103,37 @@ public class OffsetCommitService implements Service {
103103
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
104104

105105
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
106-
LogContext logContext = new LogContext("[Consumer clientId=" + clientId + "] ");
106+
107107
List<String> bootstrapServers = config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
108108
List<InetSocketAddress> addresses =
109109
ClientUtils.parseAndValidateAddresses(bootstrapServers, ClientDnsLookup.DEFAULT);
110-
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, _time);
110+
111+
LogContext logContext = new LogContext("[Consumer clientId=" + clientId + "] ");
112+
113+
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, _time, logContext);
111114

112115
LOGGER.info("Bootstrap servers config: {} | broker addresses: {}", bootstrapServers, addresses);
113116

114117
Metadata metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), logContext,
115118
new ClusterResourceListeners());
116119

117-
metadata.bootstrap(addresses, _time.milliseconds());
120+
metadata.bootstrap(addresses);
118121

119122
Selector selector =
120123
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), new Metrics(), _time,
121124
METRIC_GRP_PREFIX, channelBuilder, logContext);
122125

123-
KafkaClient kafkaClient = new NetworkClient(selector, metadata, clientId, MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
126+
KafkaClient kafkaClient = new NetworkClient(
127+
selector, metadata, clientId, MAX_INFLIGHT_REQUESTS_PER_CONNECTION,
124128
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
125129
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
126130
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
127-
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), ClientDnsLookup.DEFAULT, _time, true,
131+
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
132+
config.getInt(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG), config.getInt(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
133+
ClientDnsLookup.DEFAULT, _time, true,
128134
new ApiVersions(), logContext);
129135

136+
130137
LOGGER.debug("The network client active: {}", kafkaClient.active());
131138
LOGGER.debug("The network client has in flight requests: {}", kafkaClient.hasInFlightRequests());
132139
LOGGER.debug("The network client in flight request count: {}", kafkaClient.inFlightRequestCount());

0 commit comments

Comments
 (0)