Skip to content
This repository was archived by the owner on Jan 8, 2024. It is now read-only.

Commit 2b6f5c0

Browse files
jelyoussefiwagmarcel
authored andcommitted
BACKEND: rules update notification through kafka
1 parent 56b4183 commit 2b6f5c0

File tree

2 files changed

+9
-4
lines changed

2 files changed

+9
-4
lines changed

src/main/java/com/intel/databackend/config/cloudfoundry/ServiceConfig.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@ public class ServiceConfig implements ServiceConfigProvider {
3434
public static final String KAFKA_SERVICE_NAME = "kafka";
3535
public static final String KAFKA_SERVICE_URI = "uri";
3636
public static final String KAFKA_UPS_NAME = "kafka-ups";
37-
public static final String KAFKA_UPS_TOPIC = "topic";
37+
public static final String KAFKA_UPS_TOPICS = "topics";
3838
public static final String KAFKA_UPS_ENABLED = "enabled";
3939
public static final String KAFKA_UPS_PARTITIONS = "partitions";
4040
public static final String KAFKA_UPS_REPLICATION = "replication";
4141
public static final String KAFKA_UPS_TIMEOUT_MS = "timeout_ms";
42+
public static final String KAFKA_OBSERVATIONS_TOPIC = "observations";
4243

4344
public static final String ZOOKEEPER_BROKER_NAME = "zookeeper";
4445
public static final String ZOOKEEPER_BROKER_URI = "zk.cluster";
@@ -101,7 +102,11 @@ public Boolean isKafkaEnabled() throws VcapEnvironmentException {
101102

102103
@Override
103104
public String getKafkaTopicName() throws VcapEnvironmentException {
104-
return getFieldValueFromJson(kafkaSettings, KAFKA_UPS_NAME, KAFKA_UPS_TOPIC, String.class);
105+
try {
106+
return getFieldValueFromJson(kafkaSettings.getJSONObject(KAFKA_UPS_TOPICS), KAFKA_UPS_TOPICS, KAFKA_OBSERVATIONS_TOPIC, String.class);
107+
} catch (JSONException e) {
108+
throw new VcapEnvironmentException("Cannot get kafka topic name", e);
109+
}
105110
}
106111

107112
@Override

src/test/java/com/intel/databackend/config/cloudfoundry/ServiceConfigTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void Invoke_getUserProvidedServiceCredentialsByName() throws VcapEnvironm
4242
String topic = "metrics";
4343

4444
Mockito.when(vcapReaderServices.getUserProvidedServiceCredentialsByName(ServiceConfig.KAFKA_UPS_NAME))
45-
.thenReturn(new JSONObject("{" + ServiceConfig.KAFKA_UPS_TOPIC + ": " + topic + "}"));
45+
.thenReturn(new JSONObject("{" + ServiceConfig.KAFKA_UPS_TOPICS + ": " + "{" + ServiceConfig.KAFKA_OBSERVATIONS_TOPIC + ": " +topic + "} }"));
4646
serviceConfig.init();
4747
Assert.assertEquals(serviceConfig.getKafkaTopicName(), topic);
4848
}
@@ -85,7 +85,7 @@ public void Throws_error_when_response_is_empty() throws VcapEnvironmentExceptio
8585
@Test(expected = VcapEnvironmentException.class)
8686
public void Throws_error_when_response_not_contain_key() throws VcapEnvironmentException, JSONException {
8787
Mockito.when(vcapReaderServices.getVcapServiceCredentialsByType(ServiceConfig.KAFKA_SERVICE_NAME))
88-
.thenReturn(new JSONObject("{" + ServiceConfig.KAFKA_UPS_TOPIC + ": test}"));
88+
.thenReturn(new JSONObject("{" + ServiceConfig.KAFKA_UPS_TOPICS + ": " + "{" + ServiceConfig.KAFKA_OBSERVATIONS_TOPIC + ": test} }"));;
8989
serviceConfig.init();
9090
serviceConfig.getKafkaUri();
9191
}

0 commit comments

Comments
 (0)