Skip to content

Conversation

mccormickt12
Copy link

@mccormickt12 mccormickt12 commented Aug 6, 2025

Rationale for this change

For hdfs it's common to get scheme and netloc from config and have paths be just the uri. Add environment variables to support this case.

example

tmccormi@ltx1-hcl14866 [ ~/python ]$ export PYICEBERG_CATALOG__DEFAULT__DEFAULT_SCHEME=hdfs
tmccormi@ltx1-hcl14866 [ ~/python ]$ export PYICEBERG_CATALOG__DEFAULT__DEFAULT_NETLOC=ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000

or if not using catalog

static_table = StaticTable.from_metadata(
    "hdfs://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
     properties={
        'DEFAULT_SCHEME': 'hdfs',
        'DEFAULT_NETLOC': 'ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000',
    }
)

Are these changes tested?

Tested in test env at linkedin and with unit tests

Are there any user-facing changes?

No user facing changes by default. If you add these env variables, if file path doesn't have scheme/netloc it'll use the defaults specified.

return uri.scheme, uri.netloc, uri.path

# Load defaults from environment
default_scheme = os.getenv("DEFAULT_SCHEME", "file")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use central config instead of direct usage of env variables? pyiceberg/utils/config.py

this would enable configuration via file OR env variables, which is how most other configs are documented and exposed to catalog construction.

@cbb330
Copy link

cbb330 commented Aug 6, 2025

thanks Tom! overall excited for this PR because I've had to hack around this with an overriden PyArrowFileIO e.g.

class HDFSFileIO(PyArrowFileIO):
    """Simple PyArrowFileIO that defaults paths without scheme to HDFS"""

    @override
    def new_input(self, location: str) -> PyArrowFile:
        """Fix paths without scheme to use HDFS"""
        if not urlparse(location).scheme and location.startswith('/'):
            hdfs_host = self.properties.get(HDFS_HOST, 'localhost')
            hdfs_port = self.properties.get(HDFS_PORT, '9000')
            location = f'hdfs://{hdfs_host}:{hdfs_port}{location}'
        return super().new_input(location)

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I added a comment about passing the properties and structuring the specific code for hdfs

default_scheme = config.get_str("default-scheme") or "file"
default_netloc = config.get_str("default-netloc") or ""

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i think its better to pass these in through the properties field
https://py.iceberg.apache.org/configuration/#hdfs

we can get the env variable and then pass into the properties.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @kevinjqliu that the properties is more appropriate since this will allow setting it for each catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would look something like:

PyArrowFileIO(**{
    'hdfs.default-scheme': 'hdfs',
    'hdfs.default-netloc': 'netloc:8000',
})

Or you can set them through environment variables:

export PYICEBERG_CATALOG__DEFAULT__HDFS__DEFAULT_SCHEME=hdfs

PyIceberg will automatically inject these into a catalog with the name default:

load_catalog('default')  # Will pass in `hdfs.default-scheme` above

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry im not totally following. When is load_catalog called for client? I don't see it called in table or pyarrow. Are you saying i should load_catalog there? That doesn't seem obviously correct to me, and my example code doesn't call load_catalog either.

Copy link
Contributor

@Fokko Fokko Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sticking with us @mccormickt12 here, I think there is some miscommunication.

How are you opening up the catalog? load_catalog is the recommended way of doing this: https://py.iceberg.apache.org/api/

Let's consider the following parse_location:

    def parse_location(self, location: str) -> Tuple[str, str, str]:
        """Return (scheme, netloc, path) for the given location.

        Uses environment variables default-scheme and default-netloc
        if scheme/netloc are missing.
        """
        uri = urlparse(location)

        # Apply logic
        scheme = uri.scheme or self.properties.get("default-scheme")
        netloc = uri.netloc or self.properties.get("default-netloc")

        if scheme in ("hdfs", "viewfs"):
            return scheme, netloc, uri.path
        else:
            # For non-HDFS URIs, include netloc in the path if present
            path = uri.path if uri.scheme else os.path.abspath(location)
            if netloc and not path.startswith(netloc):
                path = f"{netloc}{path}"
            return scheme, netloc, path

You can inject the properties through:

load_catalog('default', properties={
    'default-scheme': 'hdfs',
    'default-netloc': 'ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000',
})

If you use load_catalog, it will also pick up the configuration and the environment variables:

catalog:
  default:
    default-scheme: hdfs
    default-netloc: ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000
export PYICEBERG_CATALOG__DEFAULT__HDFS__DEFAULT_SCHEME=hdfs

Or use the FileIO directly:

PyArrowFileIO(properties={
    'default-scheme': 'hdfs',
    'default-netloc': 'ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000',
})

What do you think? Does this align with the way you're using PyIceberg?

Copy link
Contributor

@Fokko Fokko Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, in the case of a StaticTable:

static_table = StaticTable.from_metadata(
    "hdfs://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
     properties={
        'default-scheme': 'hdfs',
        'default-netloc': 'ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000',
    }
)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko to be clear, the property name should be just default-scheme not hdfs.default-scheme again, the point is we need a global default, to say if there is no scheme, which to use

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccormickt12 Yes, you're right :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccormickt12 I think we're in agreement here, are you interested in updating the PR so we can get this into main?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the comment to reflect the usage and updated the PR
@Fokko i think you might need to kick off the tests

Comment on lines 404 to 413
# Apply logic
scheme = uri.scheme or default_scheme
netloc = uri.netloc or default_netloc

if scheme in ("hdfs", "viewfs"):
return scheme, netloc, uri.path
else:
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
# For non-HDFS URIs, include netloc in the path if present
path = uri.path if uri.scheme else os.path.abspath(location)
if netloc and not path.startswith(netloc):
path = f"{netloc}{path}"
return scheme, netloc, path
Copy link
Contributor

@kevinjqliu kevinjqliu Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i actually really want to get rid of this if {scheme} logic here.

Is there a way to refactor these changes down to the _initialize_hdfs_fs? so we can keep the hdfs logic in the same place?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see a nice way to do this since the path used in the pyarrowfile is actually different in the different cases, i tried to see if we could use the same path with netloc in it for hdfs but it doesn't seem to work
#2291 (comment)

@mccormickt12
Copy link
Author

this shows that setting the netloc on filesystem creation and having it in the path (as is done for the other fs types) doesn't work for hdfs

>>> hdfs = fs.HadoopFileSystem(host='ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com', port=9000)
25/08/07 17:21:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
>>> table_base = "/jobs/openhouse/cutover_zdt_testing_db/cutover_zdt_testing_table_partitioned_one-f814050d-6416-4fa8-ae85-c63ac74b4567"
>>> long_table_base = "ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com/jobs/openhouse/cutover_zdt_testing_db/cutover_zdt_testing_table_partitioned_one-f814050d-6416-4fa8-ae85-c63ac74b4567"
>>> hdfs.get_file_info(fs.FileSelector(table_base))
25/08/07 17:22:00 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
[<FileInfo for '/jobs/openhouse/cutover_zdt_testing_db/cutover_zdt_testing_table_partitioned_one-f814050d-6416-4fa8-ae85-c63ac74b4567/00000-3ec53886-ceae-46f2-a926-050afb7f95b9.metadata.json': type=FileType.File, size=2900>, <FileInfo for '/jobs/openhouse/cutover_zdt_testing_db/cutover_zdt_testing_table_partitioned_one-f814050d-6416-4fa8-ae85-c63ac74b4567/00001-fc1f6c92-0449-4deb-8908-097db5f6589a.metadata.json': type=FileType.File, size=4366>, <FileInfo for '/jobs/openhouse/cutover_zdt_testing_db/cutover_zdt_testing_table_partitioned_one-f814050d-6416-4fa8-ae85-c63ac74b4567/data': type=FileType.Directory>, <FileInfo for '/jobs/openhouse/cutover_zdt_testing_db/cutover_zdt_testing_table_partitioned_one-f814050d-6416-4fa8-ae85-c63ac74b4567/metadata': type=FileType.Directory>]
>>> hdfs.get_file_info(fs.FileSelector(long_table_base))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/_fs.pyx", line 582, in pyarrow._fs.FileSystem.get_file_info
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
FileNotFoundError: [Errno 2] HDFS list directory failed. Detail: [errno 2] No such file or directory

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Aug 10, 2025

What is the proper way to address an absolute path in HadoopFileSystem? your example shows that /path/to/file/ works but {host}/path/to/file does not work. Should {host}/path/to/file also work?

Im trying to see what the requirements are here. I only found examples with hdfs://

Also im curious if HadoopFileSystem.from_uri will work for long_table_base

@mccormickt12
Copy link
Author

What is the proper way to address an absolute path in HadoopFileSystem? your example shows that /path/to/file/ works but {host}/path/to/file does not work. Should {host}/path/to/file also work?

Im trying to see what the requirements are here. I only found examples with hdfs://

Also im curious if HadoopFileSystem.from_uri will work for long_table_base

It seems to not like the URI passed in

fs = HadoopFileSystem.from_uri("hdfs://ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000")
path = "/user/tmccormi"
fs.get_file_info(fs.FileSelector(path, recursive=False))
Traceback (most recent call last):
File "", line 1, in
AttributeError: 'pyarrow._hdfs.HadoopFileSystem' object has no attribute 'FileSelector'
fs.get_file_info(path)
25/08/12 18:16:38 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
<FileInfo for '/user/tmccormi': type=FileType.Directory>
print(fs.get_file_info(path))
<FileInfo for '/user/tmccormi': type=FileType.Directory>
path = "hdfs://ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000/user/tmccormi"
print(fs.get_file_info(path))
Traceback (most recent call last):
File "", line 1, in
File "pyarrow/_fs.pyx", line 590, in pyarrow._fs.FileSystem.get_file_info
File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: GetFileInfo must not be passed a URI, got: hdfs://ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000/user/tmccormi

@kevinjqliu
Copy link
Contributor

fs.get_file_info(fs.FileSelector(path, recursive=False))
Traceback (most recent call last):
File "", line 1, in
AttributeError: 'pyarrow._hdfs.HadoopFileSystem' object has no attribute 'FileSelector'

this is actually a syntax problem, i also ran into this while testing. instead of pyarrow.fs which conflicts with the fs object we defined for HadoopFileSystem, can you try

from pyarrow.fs import FileSelector
fs.get_file_info(FileSelector(path, recursive=False))

@kevinjqliu
Copy link
Contributor

Summarizing my thoughts a bit.

For context, heres the way to parse url today when interacting with pyarrow fileio.

  1. A location string is passed to the file io. this is assumed to be an absolute path as required by the iceberg spec. however, "absolute" can mean different things depending on the file system
  2. the location string is then parsed by the PyarrowFileIO's parse_location function. This functions returns the scheme, netloc, and path of the location string. scheme, and netloc is used to determine the FS implementation. path is used by the FS implementation to access the file.

What would be helpful here is to figure out what is the expected format for the location string for HadoopFileSystem.

If it includes the hdfs host name and port, i.e. hdfs://ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000/user/tmccormi, parse_location will return the /user/tmccormi as the path.

from urllib.parse import urlparse
urlparse("hdfs://ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000/user/tmccormi")

ParseResult(scheme='hdfs', netloc='ltx1-yugioh-cluster01.linkfs.prod-ltx1.atd.prod.linkedin.com:9000', path='/user/tmccormi', params='', query='', fragment='')

If it does not include the hdfs host name and port, i.e. /user/tmccormi, parse_location will still return /user/tmccormi as the path. But the scheme is empty.

urlparse("/user/tmccormi")

ParseResult(scheme='', netloc='', path='/user/tmccormi', params='', query='', fragment='')

Right now there are 2 places in the codebase where we can make changes to the fileio behavior.

  1. parse_location
  2. Overload the Filesystem implementation

I would like to avoid adding more switch cases in parse_location for ease of maintenance. Perhaps as an alternative, we can let the specific FileSystem implementation handle how it would like to parse_location
I would also like to keep the current way of configuring fileio with just properties, not properties and configs.

@kevinjqliu
Copy link
Contributor

Caught up with @mccormickt12 offline

Here's the issue right now:
HadoopFileSystem currently supports location string with the hdfs:// prefix. However, /path/to/file is also a valid location string. Right now, parse_location assumes that any location string without scheme is handled by the LocalFilesystem and we even add the file scheme

def parse_location(location: str) -> Tuple[str, str, str]:
"""Return the path without the scheme."""
uri = urlparse(location)
if not uri.scheme:
return "file", uri.netloc, os.path.abspath(location)

We need to also support /path/to/file as valid paths for the HadoopFileSystem 👍

path = uri.path if uri.scheme else os.path.abspath(location)
if netloc and not path.startswith(netloc):
path = f"{netloc}{path}"
return scheme, netloc, path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont remember the purpose of changing the logic inside the else block. i thought we only need to change when uri.scheme is empty

what do you think about this?

        if not uri.scheme:
            # If no scheme is provided, default to file:// (local filesystem).
            # If properties contains DEFAULT_SCHEME, use that scheme instead of "file".
            default_scheme = properties.get("DEFAULT_SCHEME", "file")
            default_netloc = properties.get("DEFAULT_NETLOC", uri.netloc)
            return default_scheme, default_netloc, os.path.abspath(location)
        elif uri.scheme in ("hdfs", "viewfs"):
            return uri.scheme, uri.netloc, uri.path
        else:
            return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assumes the default scheme will only ever be hdfs, I think its better to leave this generic and before to allow other schemes to be default and still work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants