Skip to content

Commit 1631230

Browse files
committed
test: add unit tests
1 parent 10ac7d4 commit 1631230

File tree

3 files changed

+37
-6
lines changed

3 files changed

+37
-6
lines changed

src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,23 @@ public String getTopic(String fieldName) {
2929
return (String) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TOPIC_FIELD_NAME));
3030
}
3131

32-
public Instant getTimestampFromMessage(String fieldName) {
33-
return getTimeStampFromDescriptor(fieldName, message);
32+
public Instant getMessageTimeStamp(String metadataColumnName) {
33+
Descriptors.Descriptor metadataDescriptor = metadata.getDescriptorForType();
34+
com.google.protobuf.Timestamp timestamp;
35+
if (!metadataColumnName.isEmpty()) {
36+
DynamicMessage nestedMetadataMessage = (DynamicMessage) metadata.getField(metadataDescriptor.findFieldByName(metadataColumnName));
37+
Descriptors.Descriptor nestedMetadataMessageDescriptor = nestedMetadataMessage.getDescriptorForType();
38+
timestamp = (com.google.protobuf.Timestamp) nestedMetadataMessage.getField(nestedMetadataMessageDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME));
39+
} else {
40+
timestamp = (com.google.protobuf.Timestamp) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME));
41+
}
42+
long seconds = (long) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("seconds"));
43+
int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos"));
44+
return Instant.ofEpochSecond(seconds, nanos);
3445
}
3546

36-
public Instant getTimestampFromMetadata(String fieldName) {
37-
return getTimeStampFromDescriptor(fieldName, metadata);
47+
public Instant getTimestampFromMessage(String fieldName) {
48+
return getTimeStampFromDescriptor(fieldName, message);
3849
}
3950

4051
public Instant getTimeStampFromDescriptor(String fieldName, DynamicMessage m) {
@@ -50,8 +61,8 @@ public LocalDateTime getLocalDateTime(BlobSinkConfig config) {
5061
switch (config.getFilePartitionTimeType()) {
5162
case MESSAGE_TIMESTAMP:
5263
return LocalDateTime.ofInstant(
53-
getTimestampFromMetadata(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME),
54-
ZoneId.of(config.getFilePartitionProtoTimestampTimezone()));
64+
getMessageTimeStamp(config.getOutputKafkaMetadataColumnName()),
65+
ZoneId.of("UTC"));
5566
case PROCESSING_TIMESTAMP:
5667
return LocalDateTime.now();
5768
default:

src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
public class RecordTest {
1616

1717
private final Instant defaultTimestamp = Instant.parse("2020-01-01T10:00:00.000Z");
18+
private final Instant messageTimeStamp = Instant.parse("2020-01-02T10:00:00.000Z");
1819
private final int defaultOrderNumber = 100;
1920
private final long defaultOffset = 1L;
2021
private final int defaultPartition = 1;
@@ -64,6 +65,7 @@ public void shouldGetDateTimeLocally() throws InterruptedException {
6465
@Test
6566
public void shouldGetDateTimeFromMessage() throws InterruptedException {
6667
BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class);
68+
Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP);
6769
Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time");
6870
Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC");
6971
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
@@ -72,4 +74,17 @@ public void shouldGetDateTimeFromMessage() throws InterruptedException {
7274
LocalDateTime localDateTime = record.getLocalDateTime(config);
7375
Assert.assertEquals(LocalDateTime.ofInstant(defaultTimestamp, ZoneId.of("UTC")), localDateTime);
7476
}
77+
@Test
78+
public void shouldGetDateTimeFromKafkaMessage() throws InterruptedException {
79+
BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class);
80+
Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.MESSAGE_TIMESTAMP);
81+
Mockito.when(config.getOutputKafkaMetadataColumnName()).thenReturn("nested_field");
82+
Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time");
83+
Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC");
84+
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
85+
DynamicMessage metadata = TestUtils.createMetadata("nested_field", messageTimeStamp, defaultOffset, defaultPartition, defaultTopic);
86+
Record record = new Record(message, metadata);
87+
LocalDateTime localDateTime = record.getLocalDateTime(config);
88+
Assert.assertEquals(LocalDateTime.ofInstant(messageTimeStamp, ZoneId.of("UTC")), localDateTime);
89+
}
7590
}

src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.protobuf.DynamicMessage;
44
import com.gotocompany.firehose.config.BlobSinkConfig;
5+
import com.gotocompany.firehose.config.enums.TimePartitionType;
56
import com.gotocompany.firehose.sink.blob.Constants;
67
import com.gotocompany.firehose.sink.blob.TestProtoMessage;
78
import com.gotocompany.firehose.sink.blob.TestUtils;
@@ -41,6 +42,7 @@ public void shouldCreateDayPartitioningPath() {
4142
Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone);
4243
Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName);
4344
Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY);
45+
Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP);
4446
Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(kafkaMetadataFieldName);
4547
Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn("date=");
4648
Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn("");
@@ -58,6 +60,7 @@ public void shouldCreateHourPartitioningPath() {
5860
Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone);
5961
Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(kafkaMetadataFieldName);
6062
Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName);
63+
Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP);
6164
Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.HOUR);
6265
Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix);
6366
Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(hourPrefix);
@@ -91,6 +94,7 @@ public void shouldCreatePartitionPathWhenKafkaMetadataIsNotNested() {
9194
BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class);
9295
Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone);
9396
Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY);
97+
Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP);
9498
Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName);
9599
Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix);
96100
Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(hourPrefix);
@@ -107,6 +111,7 @@ public void shouldCreatePartitioningPathForNestedKafkaMetadata() {
107111
Record record = new Record(message, metadata);
108112
BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class);
109113
Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone);
114+
Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP);
110115
Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY);
111116
Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName);
112117
Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix);

0 commit comments

Comments
 (0)