-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Uses content length to determine when to abort the stream. #14329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| } | ||
| } | ||
|
|
||
| private long remainingInCurrentRequest() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] remainingInCurrentRequest should be bytesRemainingInCurrentRequest
|
|
||
| S3InputStream(S3Client s3, S3URI location) { | ||
| this(s3, location, new S3FileIOProperties(), MetricsContext.nullMetrics()); | ||
| S3InputStream(S3Client s3, S3URI location, long contentLength) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make a new constructor for using these arguments ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is protected/private so we don't necessarily need to create a new constructor, but I'm not convinced we actually should do this.
| return AnalyticsAcceleratorUtil.newStream(this); | ||
| } | ||
| return new S3InputStream(client(), uri(), s3FileIOProperties(), metrics()); | ||
| return new S3InputStream(client(), uri(), s3FileIOProperties(), metrics(), getLength()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: getLength()
While this does not appear to have any impact as such, it does add a lot of noise to the logs
if this is just for logs there can be cases where the getLength() could be a getObjectMetadata call to s3 is it a valid tradeoff ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That read() causes the abort to fail, as it throws the exception.
The abort() code will swallow the exception right now, but this shows up as
software.amazon.awssdk.core.exception.RetryableException: Data read has a different checksum than expected. Was 0x4dd4fa955ccf4a27e2f635a22948298d, but expected 0x00000000000000000000000000000000. This commonly means that the data was corrupted between the client and service.
WARN S3InputStream: An error occurred while aborting the stream
And this happens pretty frequently.
I'm still quite new to iceberg, but from my understanding, by this point we should already have the length and this shouldn't result in a new HEAD call, I expect:
- Either its been passed down from the metadata from the avro metadata files
- Or the parquet reader has already asked for it to figure out what bytes it needs for the footer.
I don't think this getLength() call will cause an extra HEAD, but could totally be wrong. what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by this point we should already have the length and this shouldn't result in a new HEAD call
Icebergs metadata.json and manifest list would not have content length but subsequent reads down the iceberg metadata tree i,e mainifest / data / delete files should have it
How about correcting the error message ? if this is too much of a noise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you're saying removing the LOG.warn("An error occurred while aborting the stream", e);? Not sure if we want that abort() to fail silently.
I'm trying to see if I can figure out what it really means if an abort() fails on a stream that threw an exception. For eg, what's happening right now is:
* We opened a stream from 0-EoF, where EoF is 500MB
* After reading 100MB, a retryable exception was thrown
* In the resetForRetry(), abort() is called, but it fails because the `read()` fails.
My expectation is that since the underlying stream already failed, it's released that connection and the abort()/close() is a NOOP at this point. but i'm concerned that the abort() failing might mean that connections aren't getting released properly. Will try to confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not saying removing, i am saying better rephrasing ?
that the abort() failing might mean that connections aren't getting released properly.Will try to confirm.
agree, it will be a good thing to confirm if failure when aborting the stream leads to connection not being released in the pool, then i think its a valid tradeoff vs introducing a new getLenght() for some objects
abort() was introduced here as an optimization in the first place to reuse connection, please ref: https://github.com/apache/iceberg/pull/7262/files#r1156363240
| private void abortStream() { | ||
| try { | ||
| if (stream instanceof Abortable && stream.read() != -1) { | ||
| if (stream instanceof Abortable && remainingInCurrentRequest() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than adding the length check here, which adds extra requests when length isn't provided, why can't we just removed the stream.read() check here and quietly swallow errors on the abort path? I'm not sure we really need to validate the stream isn't consumed to abort. Even then, we can just ignore the exception since it's effectively closed at that point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danielcweeks if we remove the read(), and then also don't do the remainingInCurrentRequest(), then will abort() the stream, even when we're at EoF. From what I understand, calling abort() removes the connection, and close() allows for the connection reuse.
What is not clear to me is if the same behaviour holds if you're at EoF. What I would like to avoid is removing connections from the pool, when they could be re-used.
I am trying to confirm the above behaviour with the AWS SDK team, but do let me know if you have any advice here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is quite low level, and depends on the HTTP client implementation I haven't been able to find a clear answer yet. but agree on not making the getLength() call just for this, my understanding was that the length would already be available at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To add some historical context on why the stream.read() != -1 was added: #7262 (comment)
I think we should keep the read there and just remove the noisy warning messages as @bryanck already called out in #7262 (comment). Otherwise the connection will be invalidated and removed from the pool and thus won't be reused
|
@ahmarsuhail A couple comments, but overall, I really don't like that we're adding IO requests to satisfy an error case that we could workaround in other ways. Most, but not all reads will have a length provided from metadata, but there are a number of scenarios where that's not the case, which makes this a less than ideal tradeoff. |
|
Thanks @danielcweeks, just for my knowledge would you be able to share some examples of
Agree on not making that extra head call, didn't realise that would be the case. Will try and get some answers around what's really happening on the |
This PR:
Adds ConnectionClosedException to the retry list. These surface fairly often, and currently lead to task failures.
Removes the read() on the abort.
When the abort() is called on stream close, it will do a
read().When the abort is a result of a reset after an exception, in the
resetForRetry(), theread()throws an exception:This seems to be because after a failure the underlying stream will return a -1, and this triggers SDK's checksum validation. Since the read was not completed, the checksums are not updated, and the validation and the abort() fails.
While this does not appear to have any impact as such, it does add a lot of noise to the logs. And instead of doing a
read()to check EoF, this can be determined from the current pos in the stream vs content length. This is what S3A does as well, here: https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java#L732