-
Notifications
You must be signed in to change notification settings - Fork 351
Fix filesystem #2291
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix filesystem #2291
Conversation
…pecified in file path
pyiceberg/io/pyarrow.py
Outdated
return uri.scheme, uri.netloc, uri.path | ||
|
||
# Load defaults from environment | ||
default_scheme = os.getenv("DEFAULT_SCHEME", "file") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use central config instead of direct usage of env variables? pyiceberg/utils/config.py
this would enable configuration via file OR env variables, which is how most other configs are documented and exposed to catalog construction.
thanks Tom! overall excited for this PR because I've had to hack around this with an overriden PyArrowFileIO e.g. class HDFSFileIO(PyArrowFileIO):
"""Simple PyArrowFileIO that defaults paths without scheme to HDFS"""
@override
def new_input(self, location: str) -> PyArrowFile:
"""Fix paths without scheme to use HDFS"""
if not urlparse(location).scheme and location.startswith('/'):
hdfs_host = self.properties.get(HDFS_HOST, 'localhost')
hdfs_port = self.properties.get(HDFS_PORT, '9000')
location = f'hdfs://{hdfs_host}:{hdfs_port}{location}'
return super().new_input(location) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I added a comment about passing the properties and structuring the specific code for hdfs
pyiceberg/io/pyarrow.py
Outdated
default_scheme = config.get_str("default-scheme") or "file" | ||
default_netloc = config.get_str("default-netloc") or "" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i think its better to pass these in through the properties field
https://py.iceberg.apache.org/configuration/#hdfs
we can get the env variable and then pass into the properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @kevinjqliu that the properties is more appropriate since this will allow setting it for each catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would look something like:
PyArrowFileIO(**{
'hdfs.default-scheme': 'hdfs',
'hdfs.default-netloc': 'netloc:8000',
})
Or you can set them through environment variables:
export PYICEBERG_CATALOG__DEFAULT__HDFS__DEFAULT_SCHEME=hdfs
PyIceberg will automatically inject these into a catalog with the name default
:
load_catalog('default') # Will pass in `hdfs.default-scheme` above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry im not totally following. When is load_catalog called for client? I don't see it called in table or pyarrow. Are you saying i should load_catalog there? That doesn't seem obviously correct to me, and my example code doesn't call load_catalog either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for sticking with us @mccormickt12 here, I think there is some miscommunication.
How are you opening up the catalog? load_catalog
is the recommended way of doing this: https://py.iceberg.apache.org/api/
Let's consider the following parse_location
:
def parse_location(self, location: str) -> Tuple[str, str, str]:
"""Return (scheme, netloc, path) for the given location.
Uses environment variables default-scheme and default-netloc
if scheme/netloc are missing.
"""
uri = urlparse(location)
# Apply logic
scheme = uri.scheme or self.properties.get("default-scheme")
netloc = uri.netloc or self.properties.get("default-netloc")
if scheme in ("hdfs", "viewfs"):
return scheme, netloc, uri.path
else:
# For non-HDFS URIs, include netloc in the path if present
path = uri.path if uri.scheme else os.path.abspath(location)
if netloc and not path.startswith(netloc):
path = f"{netloc}{path}"
return scheme, netloc, path
You can inject the properties
through:
load_catalog('default', properties={
'default-scheme': 'hdfs',
'default-netloc': 'ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000',
})
If you use load_catalog
, it will also pick up the configuration and the environment variables:
catalog:
default:
default-scheme: hdfs
default-netloc: ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000
export PYICEBERG_CATALOG__DEFAULT__HDFS__DEFAULT_SCHEME=hdfs
Or use the FileIO
directly:
PyArrowFileIO(properties={
'default-scheme': 'hdfs',
'default-netloc': 'ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000',
})
What do you think? Does this align with the way you're using PyIceberg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, in the case of a StaticTable
:
static_table = StaticTable.from_metadata(
"hdfs://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
properties={
'default-scheme': 'hdfs',
'default-netloc': 'ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000',
}
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko to be clear, the property name should be just default-scheme
not hdfs.default-scheme
again, the point is we need a global default, to say if there is no scheme, which to use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccormickt12 Yes, you're right :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccormickt12 I think we're in agreement here, are you interested in updating the PR so we can get this into main
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the comment to reflect the usage and updated the PR
@Fokko i think you might need to kick off the tests
pyiceberg/io/pyarrow.py
Outdated
# Apply logic | ||
scheme = uri.scheme or default_scheme | ||
netloc = uri.netloc or default_netloc | ||
|
||
if scheme in ("hdfs", "viewfs"): | ||
return scheme, netloc, uri.path | ||
else: | ||
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" | ||
# For non-HDFS URIs, include netloc in the path if present | ||
path = uri.path if uri.scheme else os.path.abspath(location) | ||
if netloc and not path.startswith(netloc): | ||
path = f"{netloc}{path}" | ||
return scheme, netloc, path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i actually really want to get rid of this if {scheme} logic here.
Is there a way to refactor these changes down to the _initialize_hdfs_fs
? so we can keep the hdfs logic in the same place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't see a nice way to do this since the path used in the pyarrowfile is actually different in the different cases, i tried to see if we could use the same path with netloc in it for hdfs but it doesn't seem to work
#2291 (comment)
this shows that setting the netloc on filesystem creation and having it in the path (as is done for the other fs types) doesn't work for hdfs
|
What is the proper way to address an absolute path in HadoopFileSystem? your example shows that Im trying to see what the requirements are here. I only found examples with Also im curious if |
It seems to not like the URI passed in
|
this is actually a syntax problem, i also ran into this while testing. instead of
|
Summarizing my thoughts a bit. For context, heres the way to parse url today when interacting with pyarrow fileio.
What would be helpful here is to figure out what is the expected format for the If it includes the hdfs host name and port, i.e.
If it does not include the hdfs host name and port, i.e.
Right now there are 2 places in the codebase where we can make changes to the fileio behavior.
I would like to avoid adding more switch cases in |
Caught up with @mccormickt12 offline Here's the issue right now: iceberg-python/pyiceberg/io/pyarrow.py Lines 395 to 399 in a7f6c08
We need to also support |
48f201a
to
b40e4e8
Compare
path = uri.path if uri.scheme else os.path.abspath(location) | ||
if netloc and not path.startswith(netloc): | ||
path = f"{netloc}{path}" | ||
return scheme, netloc, path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont remember the purpose of changing the logic inside the else block. i thought we only need to change when uri.scheme
is empty
what do you think about this?
if not uri.scheme:
# If no scheme is provided, default to file:// (local filesystem).
# If properties contains DEFAULT_SCHEME, use that scheme instead of "file".
default_scheme = properties.get("DEFAULT_SCHEME", "file")
default_netloc = properties.get("DEFAULT_NETLOC", uri.netloc)
return default_scheme, default_netloc, os.path.abspath(location)
elif uri.scheme in ("hdfs", "viewfs"):
return uri.scheme, uri.netloc, uri.path
else:
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this assumes the default scheme will only ever be hdfs, I think its better to leave this generic and before to allow other schemes to be default and still work
Rationale for this change
For hdfs it's common to get scheme and netloc from config and have paths be just the uri. Add environment variables to support this case.
example
or if not using catalog
Are these changes tested?
Tested in test env at linkedin and with unit tests
Are there any user-facing changes?
No user facing changes by default. If you add these env variables, if file path doesn't have scheme/netloc it'll use the defaults specified.