Skip to content

Commit 10ac7d4

Browse files
committed
feat: adding message timestamp partition
1 parent 8b52fa6 commit 10ac7d4

File tree

5 files changed

+44
-22
lines changed

5 files changed

+44
-22
lines changed

src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.gotocompany.firehose.config.converter.BlobSinkFilePartitionTypeConverter;
44
import com.gotocompany.firehose.config.converter.BlobSinkLocalFileWriterTypeConverter;
55
import com.gotocompany.firehose.config.converter.BlobStorageTypeConverter;
6+
import com.gotocompany.firehose.config.enums.TimePartitionType;
67
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageType;
78
import com.gotocompany.firehose.sink.blob.Constants;
89

@@ -49,9 +50,9 @@ public interface BlobSinkConfig extends AppConfig {
4950
@Key("SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_FIELD_NAME")
5051
String getFilePartitionProtoTimestampFieldName();
5152

52-
@Key("SINK_BLOB_FILE_PARTITION_PROCESSING_TIME_ENABLED")
53-
@DefaultValue("false")
54-
boolean getFilePartitionProcessingTimeEnabled();
53+
@Key("SINK_BLOB_FILE_PARTITION_TIME_TYPE")
54+
@DefaultValue("EVENT_TIMESTAMP")
55+
TimePartitionType getFilePartitionTimeType();
5556

5657
@Key("SINK_BLOB_FILE_PARTITION_TIME_GRANULARITY_TYPE")
5758
@DefaultValue("day")
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.gotocompany.firehose.config.enums;
2+
3+
public enum TimePartitionType {
4+
MESSAGE_TIMESTAMP,
5+
PROCESSING_TIMESTAMP,
6+
EVENT_TIMESTAMP
7+
8+
}

src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,36 @@ public String getTopic(String fieldName) {
2929
return (String) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TOPIC_FIELD_NAME));
3030
}
3131

32-
public Instant getTimestamp(String fieldName) {
33-
Descriptors.Descriptor descriptor = message.getDescriptorForType();
32+
public Instant getTimestampFromMessage(String fieldName) {
33+
return getTimeStampFromDescriptor(fieldName, message);
34+
}
35+
36+
public Instant getTimestampFromMetadata(String fieldName) {
37+
return getTimeStampFromDescriptor(fieldName, metadata);
38+
}
39+
40+
public Instant getTimeStampFromDescriptor(String fieldName, DynamicMessage m) {
41+
Descriptors.Descriptor descriptor = m.getDescriptorForType();
3442
Descriptors.FieldDescriptor timestampField = descriptor.findFieldByName(fieldName);
35-
DynamicMessage timestamp = (DynamicMessage) message.getField(timestampField);
43+
DynamicMessage timestamp = (DynamicMessage) m.getField(timestampField);
3644
long seconds = (long) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("seconds"));
3745
int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos"));
3846
return Instant.ofEpochSecond(seconds, nanos);
3947
}
4048

4149
public LocalDateTime getLocalDateTime(BlobSinkConfig config) {
42-
if (config.getFilePartitionProcessingTimeEnabled()) {
43-
return LocalDateTime.now();
44-
} else {
45-
return LocalDateTime.ofInstant(
46-
getTimestamp(config.getFilePartitionProtoTimestampFieldName()),
47-
ZoneId.of(config.getFilePartitionProtoTimestampTimezone()));
50+
switch (config.getFilePartitionTimeType()) {
51+
case MESSAGE_TIMESTAMP:
52+
return LocalDateTime.ofInstant(
53+
getTimestampFromMetadata(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME),
54+
ZoneId.of(config.getFilePartitionProtoTimestampTimezone()));
55+
case PROCESSING_TIMESTAMP:
56+
return LocalDateTime.now();
57+
default:
58+
return LocalDateTime.ofInstant(
59+
getTimestampFromMessage(config.getFilePartitionProtoTimestampFieldName()),
60+
ZoneId.of(config.getFilePartitionProtoTimestampTimezone()));
61+
4862
}
4963
}
5064
}

src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.protobuf.DynamicMessage;
44
import com.gotocompany.firehose.config.BlobSinkConfig;
5+
import com.gotocompany.firehose.config.enums.TimePartitionType;
56
import com.gotocompany.firehose.sink.blob.TestUtils;
67
import org.junit.Assert;
78
import org.junit.Test;
@@ -41,13 +42,13 @@ public void shouldGetTimeStampFromMessage() {
4142
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
4243
DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic);
4344
Record record = new Record(message, metadata);
44-
Assert.assertEquals(defaultTimestamp, record.getTimestamp("created_time"));
45+
Assert.assertEquals(defaultTimestamp, record.getTimestampFromMessage("created_time"));
4546
}
4647

4748
@Test
4849
public void shouldGetDateTimeLocally() throws InterruptedException {
4950
BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class);
50-
Mockito.when(config.getFilePartitionProcessingTimeEnabled()).thenReturn(true);
51+
Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.PROCESSING_TIMESTAMP);
5152
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
5253
DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic);
5354
Record record = new Record(message, metadata);
@@ -63,7 +64,6 @@ public void shouldGetDateTimeLocally() throws InterruptedException {
6364
@Test
6465
public void shouldGetDateTimeFromMessage() throws InterruptedException {
6566
BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class);
66-
Mockito.when(config.getFilePartitionProcessingTimeEnabled()).thenReturn(false);
6767
Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time");
6868
Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC");
6969
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);

src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ public void setUp() {
6060
MockitoAnnotations.initMocks(this);
6161
this.sinkConfig = Mockito.mock(BlobSinkConfig.class);
6262
Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone);
63-
Mockito.when(sinkConfig.getFilePartitionProcessingTimeEnabled()).thenReturn(false);
6463
Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn("");
6564
Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName);
6665
Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.HOUR);
@@ -72,7 +71,7 @@ public void setUp() {
7271
public void shouldCreateLocalFileWriter() throws Exception {
7372
Record record = Mockito.mock(Record.class);
7473
Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now());
75-
Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L));
74+
Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L));
7675
Mockito.when(record.getTopic("")).thenReturn(defaultTopic);
7776
Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test");
7877
Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenReturn(localFileWriter1);
@@ -87,15 +86,15 @@ public void shouldCreateLocalFileWriter() throws Exception {
8786
public void shouldCreateMultipleWriterBasedOnPartition() throws Exception {
8887
Record record1 = Mockito.mock(Record.class);
8988
Mockito.when(record1.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(3600000L), ZoneId.of(zone)));
90-
Mockito.when(record1.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L));
89+
Mockito.when(record1.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L));
9190
Mockito.when(record1.getTopic("")).thenReturn(defaultTopic);
9291
Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record1, sinkConfig))).thenReturn(localFileWriter1);
9392
Mockito.when(localFileWriter1.write(record1)).thenReturn(true);
9493
Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test1");
9594

9695
Record record2 = Mockito.mock(Record.class);
9796
Mockito.when(record2.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(7200000L), ZoneId.of(zone)));
98-
Mockito.when(record2.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(7200000L));
97+
Mockito.when(record2.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(7200000L));
9998
Mockito.when(record2.getTopic("")).thenReturn(defaultTopic);
10099
Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record2, sinkConfig))).thenReturn(localFileWriter2);
101100
Mockito.when(localFileWriter2.write(record2)).thenReturn(true);
@@ -113,7 +112,7 @@ public void shouldCreateMultipleWriterBasedOnPartition() throws Exception {
113112
public void shouldThrowIOExceptionWhenWriteThrowsException() throws Exception {
114113
Record record = Mockito.mock(Record.class);
115114
Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now());
116-
Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L));
115+
Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L));
117116
Mockito.when(record.getTopic("")).thenReturn(defaultTopic);
118117
Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0));
119118
Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenReturn(localFileWriter1);
@@ -128,7 +127,7 @@ public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception {
128127
expectedException.expect(LocalFileWriterFailedException.class);
129128
Record record = Mockito.mock(Record.class);
130129
Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now());
131-
Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L));
130+
Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L));
132131
Mockito.when(record.getTopic("")).thenReturn(defaultTopic);
133132
Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0));
134133
Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenThrow(new LocalFileWriterFailedException(new IOException("Some error")));
@@ -140,7 +139,7 @@ public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception {
140139
@Test
141140
public void shouldGetEmptyFlushedPath() throws Exception {
142141
Record record = Mockito.mock(Record.class);
143-
Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L));
142+
Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L));
144143
Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(1L), ZoneId.of(zone)));
145144
Mockito.when(record.getTopic("")).thenReturn(defaultTopic);
146145
Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test");

0 commit comments

Comments
 (0)