Skip to content

Commit 603cb95

Browse files
authored
Data: Encode Nanoseconds into JSON (#647)
If greater than microsecond precision is present in a Time Field, a corresponding "nanos" property is added to json encoding capture the additional precision.
1 parent e675b59 commit 603cb95

File tree

4 files changed

+228
-28
lines changed

4 files changed

+228
-28
lines changed

data/frame.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"math"
1717
"reflect"
1818
"strings"
19-
"time"
2019

2120
"github.com/google/go-cmp/cmp"
2221
"github.com/google/go-cmp/cmp/cmpopts"
@@ -400,19 +399,6 @@ func FrameTestCompareOptions() []cmp.Option {
400399
x == y
401400
})
402401

403-
times := cmp.Comparer(func(x, y time.Time) bool {
404-
if !x.Equal(y) {
405-
// Check that the millisecond precision is the same.
406-
// Avoids problems like:
407-
// - s"1970-04-14 21:59:59.254740991 -0800 PST",
408-
// + s"1970-04-14 21:59:59.254 -0800 PST",
409-
xMS := x.UnixNano() / int64(time.Millisecond)
410-
yMS := y.UnixNano() / int64(time.Millisecond)
411-
return xMS == yMS
412-
}
413-
return true
414-
})
415-
416402
metas := cmp.Comparer(func(x, y *FrameMeta) bool {
417403
// This checks that the meta attached to the frame and
418404
// in the Golden file are the same. A conversion to JSON
@@ -439,7 +425,7 @@ func FrameTestCompareOptions() []cmp.Option {
439425
})
440426

441427
unexportedField := cmp.AllowUnexported(Field{})
442-
return []cmp.Option{f32s, f32Ptrs, f64s, f64Ptrs, confFloats, times, metas, rawjs, unexportedField, cmpopts.EquateEmpty()}
428+
return []cmp.Option{f32s, f32Ptrs, f64s, f64Ptrs, confFloats, metas, rawjs, unexportedField, cmpopts.EquateEmpty()}
443429
}
444430

445431
const maxLengthExceededStr = "..."

data/frame_json.go

Lines changed: 102 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,6 @@ func readDataFrameJSON(frame *Frame, iter *jsoniter.Iterator) error {
243243
iter.ReportError("bind l1", "unexpected field: "+l1Field)
244244
}
245245
}
246-
247246
return iter.Error
248247
}
249248

@@ -260,19 +259,22 @@ func readDataFramesJSON(frames *Frames, iter *jsoniter.Iterator) error {
260259
}
261260

262261
func readFrameData(iter *jsoniter.Iterator, frame *Frame) error {
262+
var readValues, readNanos bool
263+
nanos := make([][]int64, len(frame.Fields))
263264
for l2Field := iter.ReadObject(); l2Field != ""; l2Field = iter.ReadObject() {
264265
switch l2Field {
265266
case "values":
266267
if !iter.ReadArray() {
267268
continue // empty fields
268269
}
269-
270+
var fieldIndex int
270271
// Load the first field with a generic interface.
271272
// The length of the first will be assumed for the other fields
272273
// and can have a specialized parser
273274
if frame.Fields == nil {
274275
return errors.New("fields is nil, malformed key order or frame without schema")
275276
}
277+
276278
field := frame.Fields[0]
277279
first := make([]interface{}, 0)
278280
iter.ReadVal(&first)
@@ -283,16 +285,34 @@ func readFrameData(iter *jsoniter.Iterator, frame *Frame) error {
283285
field.vector = vec
284286
size := len(first)
285287

286-
fieldIndex := 1
288+
addNanos := func() {
289+
if readNanos {
290+
if nanos[fieldIndex] != nil {
291+
for i := 0; i < size; i++ {
292+
t, ok := field.ConcreteAt(i)
293+
if !ok {
294+
continue
295+
}
296+
field.Set(i, t.(time.Time).Add(time.Nanosecond*time.Duration(nanos[fieldIndex][i])))
297+
}
298+
}
299+
}
300+
}
301+
302+
addNanos()
303+
fieldIndex++
287304
for iter.ReadArray() {
288305
field = frame.Fields[fieldIndex]
289306
vec, err = readVector(iter, field.Type(), size)
290307
if err != nil {
291308
return err
292309
}
310+
293311
field.vector = vec
312+
addNanos()
294313
fieldIndex++
295314
}
315+
readValues = true
296316

297317
case "entities":
298318
fieldIndex := 0
@@ -308,13 +328,40 @@ func readFrameData(iter *jsoniter.Iterator, frame *Frame) error {
308328
}
309329
}
310330
} else {
311-
iter.ReadAny() // skip nills
331+
iter.ReadAny() // skip nils
312332
}
313333
fieldIndex++
314334
}
315335

316-
default:
317-
iter.ReportError("bind l2", "unexpected field: "+l2Field)
336+
case "nanos":
337+
fieldIndex := 0
338+
for iter.ReadArray() {
339+
field := frame.Fields[fieldIndex]
340+
341+
t := iter.WhatIsNext()
342+
if t == jsoniter.ArrayValue {
343+
for idx := 0; iter.ReadArray(); idx++ {
344+
ns := iter.ReadInt64()
345+
if readValues {
346+
t, ok := field.vector.ConcreteAt(idx)
347+
if !ok {
348+
continue
349+
}
350+
tWithNS := t.(time.Time).Add(time.Nanosecond * time.Duration(ns))
351+
field.vector.SetConcrete(idx, tWithNS)
352+
continue
353+
}
354+
if idx == 0 {
355+
nanos[fieldIndex] = append(nanos[fieldIndex], ns)
356+
}
357+
}
358+
} else {
359+
iter.ReadAny() // skip nils
360+
}
361+
fieldIndex++
362+
}
363+
364+
readNanos = true
318365
}
319366
}
320367
return nil
@@ -793,13 +840,18 @@ func writeDataFrameData(frame *Frame, stream *jsoniter.Stream) {
793840
entities := make([]*fieldEntityLookup, len(frame.Fields))
794841
entityCount := 0
795842

843+
nanos := make([][]int64, len(frame.Fields))
844+
nsOffSetCount := 0
845+
796846
stream.WriteObjectField("values")
797847
stream.WriteArrayStart()
798848
for fidx, f := range frame.Fields {
799849
if fidx > 0 {
800850
stream.WriteMore()
801851
}
802852
isTime := f.Type().Time()
853+
nsTime := make([]int64, rowCount)
854+
var hasNSTime bool
803855
isFloat := f.Type() == FieldTypeFloat64 || f.Type() == FieldTypeNullableFloat64 ||
804856
f.Type() == FieldTypeFloat32 || f.Type() == FieldTypeNullableFloat32
805857

@@ -811,8 +863,14 @@ func writeDataFrameData(frame *Frame, stream *jsoniter.Stream) {
811863
if v, ok := f.ConcreteAt(i); ok {
812864
switch {
813865
case isTime:
814-
vTyped := v.(time.Time).UnixNano() / int64(time.Millisecond) // Milliseconds precision.
815-
stream.WriteVal(vTyped)
866+
t := v.(time.Time)
867+
stream.WriteVal(t.UnixMilli())
868+
msRes := t.Truncate(time.Millisecond)
869+
ns := t.Sub(msRes).Nanoseconds()
870+
if ns != 0 {
871+
hasNSTime = true
872+
nsTime[i] = ns
873+
}
816874
case isFloat:
817875
// For float and nullable float we check whether a value is a special
818876
// entity (NaN, -Inf, +Inf) not supported by JSON spec, we then encode this
@@ -848,6 +906,10 @@ func writeDataFrameData(frame *Frame, stream *jsoniter.Stream) {
848906
}
849907
}
850908
stream.WriteArrayEnd()
909+
if hasNSTime {
910+
nanos[fidx] = nsTime
911+
nsOffSetCount++
912+
}
851913
}
852914
stream.WriteArrayEnd()
853915

@@ -857,6 +919,12 @@ func writeDataFrameData(frame *Frame, stream *jsoniter.Stream) {
857919
stream.WriteVal(entities)
858920
}
859921

922+
if nsOffSetCount > 0 {
923+
stream.WriteMore()
924+
stream.WriteObjectField("nanos")
925+
stream.WriteVal(nanos)
926+
}
927+
860928
stream.WriteObjectEnd()
861929
}
862930

@@ -1027,6 +1095,8 @@ func writeArrowData(stream *jsoniter.Stream, record array.Record) error {
10271095
stream.WriteObjectStart()
10281096

10291097
entities := make([]*fieldEntityLookup, fieldCount)
1098+
nanos := make([][]int64, fieldCount)
1099+
var hasNano bool
10301100
entityCount := 0
10311101

10321102
stream.WriteObjectField("values")
@@ -1040,7 +1110,11 @@ func writeArrowData(stream *jsoniter.Stream, record array.Record) error {
10401110

10411111
switch col.DataType().ID() {
10421112
case arrow.TIMESTAMP:
1043-
writeArrowDataTIMESTAMP(stream, col)
1113+
nanoOffset := writeArrowDataTIMESTAMP(stream, col)
1114+
if nanoOffset != nil {
1115+
nanos[fidx] = nanoOffset
1116+
hasNano = true
1117+
}
10441118

10451119
case arrow.UINT8:
10461120
ent = writeArrowDataUint8(stream, col)
@@ -1085,14 +1159,21 @@ func writeArrowData(stream *jsoniter.Stream, record array.Record) error {
10851159
stream.WriteVal(entities)
10861160
}
10871161

1162+
if hasNano {
1163+
stream.WriteMore()
1164+
stream.WriteObjectField("nanos")
1165+
stream.WriteVal(nanos)
1166+
}
1167+
10881168
stream.WriteObjectEnd()
10891169
return nil
10901170
}
10911171

10921172
// Custom timestamp extraction... assumes nanoseconds for everything now
1093-
func writeArrowDataTIMESTAMP(stream *jsoniter.Stream, col array.Interface) {
1173+
func writeArrowDataTIMESTAMP(stream *jsoniter.Stream, col array.Interface) []int64 {
10941174
count := col.Len()
1095-
1175+
var hasNSTime bool
1176+
nsTime := make([]int64, count)
10961177
v := array.NewTimestampData(col.Data())
10971178
stream.WriteArrayStart()
10981179
for i := 0; i < count; i++ {
@@ -1107,12 +1188,22 @@ func writeArrowDataTIMESTAMP(stream *jsoniter.Stream, col array.Interface) {
11071188
ms := int64(ns) / int64(time.Millisecond) // nanosecond assumption
11081189
stream.WriteInt64(ms)
11091190

1191+
nsOffSet := int64(ns) - ms*int64(1e6)
1192+
if nsOffSet != 0 {
1193+
hasNSTime = true
1194+
nsTime[i] = nsOffSet
1195+
}
1196+
11101197
if stream.Error != nil { // ???
11111198
stream.Error = nil
11121199
stream.WriteNil()
11131200
}
11141201
}
11151202
stream.WriteArrayEnd()
1203+
if hasNSTime {
1204+
return nsTime
1205+
}
1206+
return nil
11161207
}
11171208

11181209
func readTimeVectorJSON(iter *jsoniter.Iterator, nullable bool, size int) (vector, error) {

data/frame_json_test.go

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sync"
1010
"testing"
1111
"text/template"
12+
"time"
1213

1314
jsoniter "github.com/json-iterator/go"
1415
"golang.org/x/text/cases"
@@ -40,7 +41,7 @@ func TestGoldenFrameJSON(t *testing.T) {
4041
fmt.Println(strF)
4142
fmt.Println(`}`)
4243

43-
assert.JSONEq(t, strF, strA, "arrow and frames should produce the same json")
44+
require.JSONEq(t, strF, strA, "arrow and frames should produce the same json")
4445

4546
goldenFile := filepath.Join("testdata", "all_types.golden.json")
4647
if _, err := os.Stat(goldenFile); os.IsNotExist(err) {
@@ -52,7 +53,7 @@ func TestGoldenFrameJSON(t *testing.T) {
5253
require.NoError(t, err)
5354

5455
strG := string(b)
55-
assert.JSONEq(t, strF, strG, "saved json must match produced json")
56+
require.JSONEq(t, strF, strG, "saved json must match produced json")
5657

5758
// Read the frame from json
5859
out := &data.Frame{}
@@ -70,6 +71,91 @@ type simpleTestObj struct {
7071
FType2 *data.FieldType `json:"typePtr,omitempty"`
7172
}
7273

74+
func TestJSONNanoTime(t *testing.T) {
75+
t.Run("time no nano", func(t *testing.T) {
76+
noNanoFrame := data.NewFrame("frame_no_nano",
77+
// 1 second and 1 MS
78+
data.NewField("t", nil, []time.Time{time.Unix(1, 1000000)}),
79+
)
80+
81+
noNanoJSONBytes, err := json.Marshal(noNanoFrame)
82+
require.NoError(t, err)
83+
84+
noNanoFrameFromJSON := &data.Frame{}
85+
err = json.Unmarshal(noNanoJSONBytes, noNanoFrameFromJSON)
86+
require.NoError(t, err)
87+
88+
if diff := cmp.Diff(noNanoFrame, noNanoFrameFromJSON, data.FrameTestCompareOptions()...); diff != "" {
89+
t.Errorf("Result mismatch (-want +got):\n%s", diff)
90+
}
91+
})
92+
93+
t.Run("time with nano", func(t *testing.T) {
94+
nanoFrame := data.NewFrame("frame_nano",
95+
// 1 second and 10 ns
96+
data.NewField("i", nil, []int64{1}),
97+
data.NewField("t", nil, []time.Time{time.Unix(1, 10)}),
98+
)
99+
100+
nanoJSONBytes, err := json.Marshal(nanoFrame)
101+
require.NoError(t, err)
102+
103+
nanoFrameFromJSON := &data.Frame{}
104+
err = json.Unmarshal(nanoJSONBytes, nanoFrameFromJSON)
105+
require.NoError(t, err)
106+
107+
if diff := cmp.Diff(nanoFrame, nanoFrameFromJSON, data.FrameTestCompareOptions()...); diff != "" {
108+
t.Errorf("Result mismatch (-want +got):\n%s", diff)
109+
}
110+
})
111+
112+
t.Run("nullable with nano", func(t *testing.T) {
113+
nanoFrame := data.NewFrame("frame_nano",
114+
// 1 second and 10 ns
115+
data.NewField("i", nil, []int64{1, 2}),
116+
data.NewField("t", nil, []*time.Time{nil, timePtr(time.Unix(1, 10))}),
117+
)
118+
119+
nanoJSONBytes, err := json.Marshal(nanoFrame)
120+
require.NoError(t, err)
121+
122+
nanoFrameFromJSON := &data.Frame{}
123+
err = json.Unmarshal(nanoJSONBytes, nanoFrameFromJSON)
124+
require.NoError(t, err)
125+
126+
if diff := cmp.Diff(nanoFrame, nanoFrameFromJSON, data.FrameTestCompareOptions()...); diff != "" {
127+
t.Errorf("Result mismatch (-want +got):\n%s", diff)
128+
}
129+
})
130+
131+
t.Run("nanos before values property in data", func(t *testing.T) {
132+
jsString := `{
133+
"schema": {
134+
"name": "frame_nano",
135+
"fields": [
136+
{ "name": "i", "type": "number", "typeInfo": { "frame": "int64" } },
137+
{ "name": "t", "type": "time", "typeInfo": { "frame": "time.Time" } }
138+
]
139+
},
140+
"data": { "nanos": [null, [10]], "values": [[1], [1000]] }
141+
}`
142+
143+
nanoFrame := data.NewFrame("frame_nano",
144+
// 1 second and 10 ns
145+
data.NewField("i", nil, []int64{1}),
146+
data.NewField("t", nil, []time.Time{time.Unix(1, 10)}),
147+
)
148+
149+
nanoFrameFromJSON := &data.Frame{}
150+
err := json.Unmarshal([]byte(jsString), nanoFrameFromJSON)
151+
require.NoError(t, err)
152+
153+
if diff := cmp.Diff(nanoFrame, nanoFrameFromJSON, data.FrameTestCompareOptions()...); diff != "" {
154+
require.Fail(t, "Result mismatch (-want +got):\n%s", diff)
155+
}
156+
})
157+
}
158+
73159
// TestFieldTypeToJSON makes sure field type will read/write to json
74160
func TestFieldTypeToJSON(t *testing.T) {
75161
v := simpleTestObj{

0 commit comments

Comments
 (0)