2424
2525import java .io .IOException ;
2626import java .time .Instant ;
27+ import java .time .LocalDateTime ;
28+ import java .time .ZoneId ;
2729import java .util .HashSet ;
2830import java .util .Set ;
2931
@@ -58,6 +60,7 @@ public void setUp() {
5860 MockitoAnnotations .initMocks (this );
5961 this .sinkConfig = Mockito .mock (BlobSinkConfig .class );
6062 Mockito .when (sinkConfig .getFilePartitionProtoTimestampTimezone ()).thenReturn (zone );
63+ Mockito .when (sinkConfig .getFilePartitionProcessingTimeEnabled ()).thenReturn (false );
6164 Mockito .when (sinkConfig .getOutputKafkaMetadataColumnName ()).thenReturn ("" );
6265 Mockito .when (sinkConfig .getFilePartitionProtoTimestampFieldName ()).thenReturn (timeStampFieldName );
6366 Mockito .when (sinkConfig .getFilePartitionTimeGranularityType ()).thenReturn (Constants .FilePartitionType .HOUR );
@@ -68,6 +71,7 @@ public void setUp() {
6871 @ Test
6972 public void shouldCreateLocalFileWriter () throws Exception {
7073 Record record = Mockito .mock (Record .class );
74+ Mockito .when (record .getLocalDateTime (sinkConfig )).thenReturn (LocalDateTime .now ());
7175 Mockito .when (record .getTimestamp (timeStampFieldName )).thenReturn (Instant .ofEpochMilli (1L ));
7276 Mockito .when (record .getTopic ("" )).thenReturn (defaultTopic );
7377 Mockito .when (localFileWriter1 .getFullPath ()).thenReturn ("/tmp/test" );
@@ -82,13 +86,15 @@ public void shouldCreateLocalFileWriter() throws Exception {
8286 @ Test
8387 public void shouldCreateMultipleWriterBasedOnPartition () throws Exception {
8488 Record record1 = Mockito .mock (Record .class );
89+ Mockito .when (record1 .getLocalDateTime (sinkConfig )).thenReturn (LocalDateTime .ofInstant (Instant .ofEpochMilli (3600000L ), ZoneId .of (zone )));
8590 Mockito .when (record1 .getTimestamp (timeStampFieldName )).thenReturn (Instant .ofEpochMilli (3600000L ));
8691 Mockito .when (record1 .getTopic ("" )).thenReturn (defaultTopic );
8792 Mockito .when (localStorage .createLocalFileWriter (TimePartitionedPathUtils .getTimePartitionedPath (record1 , sinkConfig ))).thenReturn (localFileWriter1 );
8893 Mockito .when (localFileWriter1 .write (record1 )).thenReturn (true );
8994 Mockito .when (localFileWriter1 .getFullPath ()).thenReturn ("/tmp/test1" );
9095
9196 Record record2 = Mockito .mock (Record .class );
97+ Mockito .when (record2 .getLocalDateTime (sinkConfig )).thenReturn (LocalDateTime .ofInstant (Instant .ofEpochMilli (7200000L ), ZoneId .of (zone )));
9298 Mockito .when (record2 .getTimestamp (timeStampFieldName )).thenReturn (Instant .ofEpochMilli (7200000L ));
9399 Mockito .when (record2 .getTopic ("" )).thenReturn (defaultTopic );
94100 Mockito .when (localStorage .createLocalFileWriter (TimePartitionedPathUtils .getTimePartitionedPath (record2 , sinkConfig ))).thenReturn (localFileWriter2 );
@@ -106,6 +112,7 @@ public void shouldCreateMultipleWriterBasedOnPartition() throws Exception {
106112 @ Test (expected = IOException .class )
107113 public void shouldThrowIOExceptionWhenWriteThrowsException () throws Exception {
108114 Record record = Mockito .mock (Record .class );
115+ Mockito .when (record .getLocalDateTime (sinkConfig )).thenReturn (LocalDateTime .now ());
109116 Mockito .when (record .getTimestamp (timeStampFieldName )).thenReturn (Instant .ofEpochMilli (3600000L ));
110117 Mockito .when (record .getTopic ("" )).thenReturn (defaultTopic );
111118 Mockito .when (localFileWriter1 .getMetadata ()).thenReturn (new LocalFileMetadata ("/tmp/" , "/tmp/test1" , 0 , 0 , 0 ));
@@ -120,6 +127,7 @@ public void shouldThrowIOExceptionWhenWriteThrowsException() throws Exception {
120127 public void shouldThrowIOExceptionWhenOpenNewWriterFailed () throws Exception {
121128 expectedException .expect (LocalFileWriterFailedException .class );
122129 Record record = Mockito .mock (Record .class );
130+ Mockito .when (record .getLocalDateTime (sinkConfig )).thenReturn (LocalDateTime .now ());
123131 Mockito .when (record .getTimestamp (timeStampFieldName )).thenReturn (Instant .ofEpochMilli (3600000L ));
124132 Mockito .when (record .getTopic ("" )).thenReturn (defaultTopic );
125133 Mockito .when (localFileWriter1 .getMetadata ()).thenReturn (new LocalFileMetadata ("/tmp/" , "/tmp/test1" , 0 , 0 , 0 ));
@@ -133,6 +141,7 @@ public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception {
133141 public void shouldGetEmptyFlushedPath () throws Exception {
134142 Record record = Mockito .mock (Record .class );
135143 Mockito .when (record .getTimestamp (timeStampFieldName )).thenReturn (Instant .ofEpochMilli (1L ));
144+ Mockito .when (record .getLocalDateTime (sinkConfig )).thenReturn (LocalDateTime .ofInstant (Instant .ofEpochMilli (1L ), ZoneId .of (zone )));
136145 Mockito .when (record .getTopic ("" )).thenReturn (defaultTopic );
137146 Mockito .when (localFileWriter1 .getFullPath ()).thenReturn ("/tmp/test" );
138147 Mockito .when (localStorage .createLocalFileWriter (TimePartitionedPathUtils .getTimePartitionedPath (record , sinkConfig ))).thenReturn (localFileWriter1 );
0 commit comments