Skip to content

Commit 80a5a22

Browse files
committed
MINOR: quic: refactor buffered STREAM ACK consuming
For the moment, streamdesc layer can only deal with in-order ACK at the stream level. Received out-of-order ACKs are buffered in a tree attached to a streambuf instance. Previously, caller of qc_stream_desc_ack() was responsible to implement consumption of these buffered ACKs. Refactor this by implementing it directly at the streamdesc layer within qc_stream_desc_ack(). This simplifies quic_rx ACK handling and ensure buffered ACKs are consumed as soon as possible.
1 parent 431785d commit 80a5a22

File tree

2 files changed

+91
-107
lines changed

2 files changed

+91
-107
lines changed

src/quic_rx.c

Lines changed: 1 addition & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -201,71 +201,6 @@ static int qc_pkt_decrypt(struct quic_conn *qc, struct quic_enc_level *qel,
201201
return ret;
202202
}
203203

204-
/* Remove from <stream> the acknowledged frames.
205-
*
206-
* Returns 1 if at least one frame was removed else 0.
207-
*/
208-
static int quic_stream_try_to_consume(struct quic_conn *qc,
209-
struct qc_stream_desc *stream)
210-
{
211-
int ret;
212-
struct eb64_node *frm_node;
213-
struct qc_stream_buf *stream_buf;
214-
struct eb64_node *buf_node;
215-
216-
TRACE_ENTER(QUIC_EV_CONN_ACKSTRM, qc);
217-
218-
ret = 0;
219-
buf_node = eb64_first(&stream->buf_tree);
220-
if (buf_node) {
221-
stream_buf = eb64_entry(buf_node, struct qc_stream_buf,
222-
offset_node);
223-
224-
frm_node = eb64_first(&stream_buf->acked_frms);
225-
while (frm_node) {
226-
struct qf_stream *strm_frm;
227-
struct quic_frame *frm;
228-
size_t offset;
229-
230-
strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
231-
frm = container_of(strm_frm, struct quic_frame, stream);
232-
offset = strm_frm->offset.key;
233-
234-
if (offset > stream->ack_offset)
235-
break;
236-
237-
/* First delete frm from tree. This prevents BUG_ON() if
238-
* stream_buf instance is removed via qc_stream_desc_ack().
239-
*/
240-
eb64_delete(frm_node);
241-
242-
if (qc_stream_desc_ack(stream, frm)) {
243-
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
244-
qc, strm_frm, stream);
245-
ret = 1;
246-
}
247-
qc_release_frm(qc, frm);
248-
249-
/* Always retrieve first buffer as the previously used
250-
* instance could have been removed during qc_stream_desc_ack().
251-
*/
252-
buf_node = eb64_first(&stream->buf_tree);
253-
if (buf_node) {
254-
stream_buf = eb64_entry(buf_node, struct qc_stream_buf,
255-
offset_node);
256-
frm_node = eb64_first(&stream_buf->acked_frms);
257-
BUG_ON(!frm_node && !b_data(&stream_buf->buf));
258-
}
259-
else {
260-
frm_node = NULL;
261-
}
262-
}
263-
}
264-
265-
TRACE_LEAVE(QUIC_EV_CONN_ACKSTRM, qc);
266-
return ret;
267-
}
268-
269204
/* Handle <frm> frame whose packet it is attached to has just been acknowledged. The memory allocated
270205
* for this frame will be at least released in every cases.
271206
* Never fail.
@@ -298,13 +233,10 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f
298233
}
299234
stream = eb64_entry(node, struct qc_stream_desc, by_id);
300235

301-
TRACE_DEVEL("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
302236
if (!qc_stream_desc_ack(stream, frm)) {
303-
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
237+
TRACE_DEVEL("stream consumed on ACK received", QUIC_EV_CONN_ACKSTRM,
304238
qc, strm_frm, stream);
305239

306-
quic_stream_try_to_consume(qc, stream);
307-
308240
if (qc_stream_desc_done(stream)) {
309241
/* no need to continue if stream freed. */
310242
TRACE_DEVEL("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc);

src/quic_stream.c

Lines changed: 90 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,79 @@ void qc_stream_desc_release(struct qc_stream_desc *stream,
134134
}
135135
}
136136

137+
/* Acknowledges data for buffer <buf> attached to <stream> instance. This covers
138+
* the range strating at <offset> and of length <len>, with <fin> sets for the
139+
* last stream frame.
140+
*
141+
* Returns <buf> if there is still data to acknowledge or buffered ACK to
142+
* consume after completing the operation. Else, the next buffer instance of
143+
* stream is returned if it exists or NULL in the contrary case.
144+
*/
145+
static struct qc_stream_buf *qc_stream_buf_ack(struct qc_stream_buf *buf,
146+
struct qc_stream_desc *stream,
147+
uint64_t offset, uint64_t len, int fin)
148+
{
149+
/* This function does not deal with out-of-order ACK. */
150+
BUG_ON(offset > stream->ack_offset);
151+
152+
if (offset + len > stream->ack_offset) {
153+
const uint64_t diff = offset + len - stream->ack_offset;
154+
b_del(&buf->buf, diff);
155+
stream->ack_offset += diff;
156+
}
157+
158+
if (fin) {
159+
/* Mark FIN as acknowledged. */
160+
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
161+
}
162+
163+
if (!b_data(&buf->buf) && eb_is_empty(&buf->acked_frms)) {
164+
qc_stream_buf_free(stream, &buf);
165+
/* Retrieve next buffer instance. */
166+
buf = !eb_is_empty(&stream->buf_tree) ?
167+
eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node) :
168+
NULL;
169+
}
170+
171+
return buf;
172+
}
173+
174+
/* Consume buffered ACK starting at <stream_buf>. If all buffer data is
175+
* removed, <stream_buf> is freed and consume will be conducted for following
176+
* streambufs from <stream> if present.
177+
*/
178+
static void qc_stream_buf_consume(struct qc_stream_buf *stream_buf,
179+
struct qc_stream_desc *stream)
180+
{
181+
struct eb64_node *frm_node;
182+
struct qf_stream *strm_frm;
183+
struct quic_frame *frm;
184+
uint64_t offset, len;
185+
int fin;
186+
187+
frm_node = eb64_first(&stream_buf->acked_frms);
188+
while (frm_node) {
189+
strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
190+
frm = container_of(strm_frm, struct quic_frame, stream);
191+
192+
offset = strm_frm->offset.key;
193+
len = strm_frm->len;
194+
fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;
195+
196+
if (offset > stream->ack_offset)
197+
break;
198+
199+
/* Delete frame before acknowledged it. This prevents BUG_ON()
200+
* on non-empty acked_frms tree when stream_buf is empty and removed.
201+
*/
202+
eb64_delete(frm_node);
203+
stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin);
204+
qc_release_frm(NULL, frm);
205+
206+
frm_node = stream_buf ? eb64_first(&stream_buf->acked_frms) : NULL;
207+
}
208+
}
209+
137210
/* Acknowledge <frm> STREAM frame whose content is managed by <stream>
138211
* descriptor.
139212
*
@@ -150,63 +223,42 @@ int qc_stream_desc_ack(struct qc_stream_desc *stream, struct quic_frame *frm)
150223

151224
struct qc_stream_buf *stream_buf = NULL;
152225
struct eb64_node *buf_node;
153-
struct buffer *buf = NULL;
154-
size_t diff;
226+
int ret = 0;
155227

156228
/* Cannot advertise FIN for an inferior data range. */
157229
BUG_ON(fin && offset + len < stream->ack_offset);
158230

159-
if (offset + len < stream->ack_offset) {
160-
return 0;
231+
/* Do nothing for offset + len < stream->ack_offset as data were
232+
* already acknowledged and removed.
233+
*/
234+
235+
if (!len) {
236+
BUG_ON(!fin); /* An empty STREAM frame is only needed for a late FIN reporting. */
237+
238+
/* Empty STREAM frame with FIN can be acknowledged out-of-order. */
239+
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
161240
}
162241
else if (offset > stream->ack_offset) {
163242
buf_node = eb64_lookup_le(&stream->buf_tree, offset);
164243
BUG_ON(!buf_node); /* Cannot acknowledged a STREAM frame for a non existing buffer. */
165-
166244
stream_buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node);
167245
eb64_insert(&stream_buf->acked_frms, &strm_frm->offset);
168-
return 1;
246+
ret = 1;
169247
}
170-
171-
diff = offset + len - stream->ack_offset;
172-
if (diff) {
248+
else if (offset + len > stream->ack_offset) {
173249
/* Buf list cannot be empty if there is still unacked data. */
174250
BUG_ON(eb_is_empty(&stream->buf_tree));
175251

176252
/* get oldest buffer from buf tree */
177253
stream_buf = eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node);
178-
buf = &stream_buf->buf;
254+
stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin);
179255

180-
stream->ack_offset += diff;
181-
b_del(buf, diff);
182-
183-
/* Free oldest buffer if all data acknowledged. */
184-
if (!b_data(buf)) {
185-
/* Remove buffered ACK before deleting buffer instance. */
186-
while (!eb_is_empty(&stream_buf->acked_frms)) {
187-
struct quic_conn *qc = stream->qc;
188-
struct eb64_node *frm_node;
189-
struct qf_stream *strm_frm;
190-
struct quic_frame *frm;
191-
192-
frm_node = eb64_first(&stream_buf->acked_frms);
193-
eb64_delete(frm_node);
194-
195-
strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
196-
frm = container_of(strm_frm, struct quic_frame, stream);
197-
qc_release_frm(qc, frm);
198-
}
199-
qc_stream_buf_free(stream, &stream_buf);
200-
buf = NULL;
201-
}
202-
}
203-
204-
if (fin) {
205-
/* Mark FIN as acknowledged. */
206-
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
256+
/* some data were acknowledged, try to consume buffered ACKs */
257+
if (stream_buf)
258+
qc_stream_buf_consume(stream_buf, stream);
207259
}
208260

209-
return 0;
261+
return ret;
210262
}
211263

212264
/* Free the stream descriptor <stream> content. This function should be used

0 commit comments

Comments
 (0)