@@ -38,19 +38,23 @@ type queueError struct {
3838 message string
3939}
4040
41- // getUvarintSize returns the number of bytes to encode x in uvarint format
42- func getUvarintSize (x uint32 ) int {
43- if x < 128 {
44- return 1
45- } else if x < 16384 {
46- return 2
47- } else if x < 2097152 {
48- return 3
49- } else if x < 268435456 {
50- return 4
51- } else {
52- return 5
41+ // getNeededSize returns the number of bytes an entry of length need in the queue
42+ func getNeededSize (length int ) int {
43+ var header int
44+ switch {
45+ case length < 127 : // 1<<7-1
46+ header = 1
47+ case length < 16382 : // 1<<14-2
48+ header = 2
49+ case length < 2097149 : // 1<<21 -3
50+ header = 3
51+ case length < 268435452 : // 1<<28 -4
52+ header = 4
53+ default :
54+ header = 5
5355 }
56+
57+ return length + header
5458}
5559
5660// NewBytesQueue initialize new bytes queue.
@@ -82,22 +86,21 @@ func (q *BytesQueue) Reset() {
8286// Push copies entry at the end of queue and moves tail pointer. Allocates more space if needed.
8387// Returns index for pushed data or error if maximum size queue limit is reached.
8488func (q * BytesQueue ) Push (data []byte ) (int , error ) {
85- dataLen := len (data )
86- headerEntrySize := getUvarintSize (uint32 (dataLen ))
89+ neededSize := getNeededSize (len (data ))
8790
88- if ! q .canInsertAfterTail (dataLen + headerEntrySize ) {
89- if q .canInsertBeforeHead (dataLen + headerEntrySize ) {
91+ if ! q .canInsertAfterTail (neededSize ) {
92+ if q .canInsertBeforeHead (neededSize ) {
9093 q .tail = leftMarginIndex
91- } else if q .capacity + headerEntrySize + dataLen >= q .maxCapacity && q .maxCapacity > 0 {
94+ } else if q .capacity + neededSize >= q .maxCapacity && q .maxCapacity > 0 {
9295 return - 1 , & queueError {"Full queue. Maximum size limit reached." }
9396 } else {
94- q .allocateAdditionalMemory (dataLen + headerEntrySize )
97+ q .allocateAdditionalMemory (neededSize )
9598 }
9699 }
97100
98101 index := q .tail
99102
100- q .push (data , dataLen )
103+ q .push (data , neededSize )
101104
102105 return index , nil
103106}
@@ -120,9 +123,8 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) {
120123
121124 if q .tail <= q .head {
122125 if q .tail != q .head {
123- headerEntrySize := getUvarintSize (uint32 (q .head - q .tail ))
124- emptyBlobLen := q .head - q .tail - headerEntrySize
125- q .push (make ([]byte , emptyBlobLen ), emptyBlobLen )
126+ // created slice is slightly larger then need but this is fine after only the needed bytes are copied
127+ q .push (make ([]byte , q .head - q .tail ), q .head - q .tail )
126128 }
127129
128130 q .head = leftMarginIndex
@@ -141,7 +143,7 @@ func (q *BytesQueue) push(data []byte, len int) {
141143 headerEntrySize := binary .PutUvarint (q .headerBuffer , uint64 (len ))
142144 q .copy (q .headerBuffer , headerEntrySize )
143145
144- q .copy (data , len )
146+ q .copy (data , len - headerEntrySize )
145147
146148 if q .tail > q .head {
147149 q .rightMargin = q .tail
@@ -159,13 +161,12 @@ func (q *BytesQueue) copy(data []byte, len int) {
159161
160162// Pop reads the oldest entry from queue and moves head pointer to the next one
161163func (q * BytesQueue ) Pop () ([]byte , error ) {
162- data , headerEntrySize , err := q .peek (q .head )
164+ data , blockSize , err := q .peek (q .head )
163165 if err != nil {
164166 return nil , err
165167 }
166- size := len (data )
167168
168- q .head += headerEntrySize + size
169+ q .head += blockSize
169170 q .count --
170171
171172 if q .head == q .rightMargin {
@@ -238,7 +239,7 @@ func (q *BytesQueue) peek(index int) ([]byte, int, error) {
238239 }
239240
240241 blockSize , n := binary .Uvarint (q .array [index :])
241- return q .array [index + n : index + n + int (blockSize )], n , nil
242+ return q .array [index + n : index + int (blockSize )], int ( blockSize ) , nil
242243}
243244
244245// canInsertAfterTail returns true if it's possible to insert an entry of size of need after the tail of the queue
0 commit comments