diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..a31cba7 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,5 @@ +[target.aarch64-apple-darwin] +rustflags = ["-Clink-arg=-undefined","-Clink-arg=dynamic_lookup"] + +[target.x86_64-apple-darwin] +rustflags = ["-Clink-arg=-undefined","-Clink-arg=dynamic_lookup"] \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 6cdaad3..81b7a24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,6 +98,9 @@ name = "anyhow" version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +dependencies = [ + "backtrace", +] [[package]] name = "apache-avro" @@ -277,13 +280,37 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0f477b951e452a0b6b4a10b53ccd569042d1d01729b519e02074a9c0958a063" +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-compression" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "977eb15ea9efd848bb8a4a1a2500347ed7f0bf794edf0dc3ddcf439f43d36b23" +dependencies = [ + "compression-codecs", + "compression-core", + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-lock" version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener", + "event-listener 5.4.0", "event-listener-strategy", "pin-project-lite", ] @@ -309,15 +336,10 @@ dependencies = [ ] [[package]] -name = "atty" -version = "0.2.14" +name = "atomic-waker" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi 0.1.19", - "libc", - "winapi", -] +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "autocfg" @@ -325,6 +347,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom 0.2.16", + "instant", + "rand 0.8.5", +] + [[package]] name = "backon" version = "1.5.2" @@ -377,12 +410,6 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - [[package]] name = "bitflags" version = "2.9.1" @@ -500,25 +527,6 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" -[[package]] -name = "cbindgen" -version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da6bc11b07529f16944307272d5bd9b22530bc7d05751717c9d416586cedab49" -dependencies = [ - "clap", - "heck 0.4.1", - "indexmap 1.9.3", - "log", - "proc-macro2", - "quote", - "serde", - "serde_json", - "syn 1.0.109", - "tempfile", - "toml", -] - [[package]] name = "cc" version = "1.2.30" @@ -558,29 +566,33 @@ dependencies = [ ] [[package]] -name = "clap" -version = "3.2.25" +name = "cmake" +version = "0.1.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" +checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" dependencies = [ - "atty", - "bitflags 1.3.2", - "clap_lex", - "indexmap 1.9.3", - "strsim 0.10.0", - "termcolor", - "textwrap", + "cc", ] [[package]] -name = "clap_lex" -version = "0.2.4" +name = "compression-codecs" +version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" +checksum = "485abf41ac0c8047c07c87c72c8fb3eb5197f6e9d7ded615dfd1a00ae00a0f64" dependencies = [ - "os_str_bytes", + "compression-core", + "flate2", + "memchr", + "zstd", + "zstd-safe", ] +[[package]] +name = "compression-core" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e47641d3deaf41fb1538ac1f54735925e275eaf3bf4d55c81b137fba797e5cbb" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -616,6 +628,16 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -676,6 +698,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -718,7 +749,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim 0.11.1", + "strsim", "syn 2.0.104", ] @@ -739,6 +770,12 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04d2cd9c18b9f454ed67da600630b021a8a80bf33f8c95896ab33aaf1c26b728" +[[package]] +name = "data-encoding" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" + [[package]] name = "deranged" version = "0.4.0" @@ -830,6 +867,24 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + +[[package]] +name = "enum-as-inner" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -846,6 +901,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "event-listener" version = "5.4.0" @@ -863,7 +924,7 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" dependencies = [ - "event-listener", + "event-listener 5.4.0", "pin-project-lite", ] @@ -889,7 +950,7 @@ version = "25.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1" dependencies = [ - "bitflags 2.9.1", + "bitflags", "rustc_version", ] @@ -900,16 +961,44 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" dependencies = [ "crc32fast", + "libz-ng-sys", "libz-rs-sys", "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1083,6 +1172,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "h2" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap 2.10.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.6.0" @@ -1119,27 +1227,12 @@ version = "0.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" -[[package]] -name = "heck" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" - [[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.5.2" @@ -1152,6 +1245,51 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hickory-proto" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92652067c9ce6f66ce53cc38d1169daa36e6e7eb7dd3b63b5103bd9d97117248" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna", + "ipnet", + "once_cell", + "rand 0.8.5", + "thiserror 1.0.69", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "hickory-resolver" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbb117a1ca520e111743ab2f6688eddee69db4e0ea242545a604dce8a66fd22e" +dependencies = [ + "cfg-if", + "futures-util", + "hickory-proto", + "ipconfig", + "lru-cache", + "once_cell", + "parking_lot", + "rand 0.8.5", + "resolv-conf", + "smallvec", + "thiserror 1.0.69", + "tokio", + "tracing", +] + [[package]] name = "hmac" version = "0.12.1" @@ -1210,6 +1348,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "humantime" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" + [[package]] name = "hyper" version = "1.6.0" @@ -1219,6 +1363,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", + "h2", "http", "http-body", "httparse", @@ -1239,6 +1384,7 @@ dependencies = [ "hyper", "hyper-util", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -1351,17 +1497,20 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "arrow-array", "arrow-ipc", - "cbindgen", "futures", "iceberg", "libc", + "object_store_ffi", + "once_cell", "tempfile", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] @@ -1499,6 +1648,15 @@ dependencies = [ "serde", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "integer-encoding" version = "3.0.4" @@ -1511,11 +1669,23 @@ version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" dependencies = [ - "bitflags 2.9.1", + "bitflags", "cfg-if", "libc", ] +[[package]] +name = "ipconfig" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" +dependencies = [ + "socket2 0.5.10", + "widestring", + "windows-sys 0.48.0", + "winreg", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -1673,6 +1843,16 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" +[[package]] +name = "libz-ng-sys" +version = "1.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7118c2c2a3c7b6edc279a8b19507672b9c4d716f95e671172dfa4e23f9fd824" +dependencies = [ + "cmake", + "libc", +] + [[package]] name = "libz-rs-sys" version = "0.5.1" @@ -1682,6 +1862,12 @@ dependencies = [ "zlib-rs", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.9.4" @@ -1723,6 +1909,15 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "lru-cache" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -1763,6 +1958,35 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "metrics" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3045b4193fbdc5b5681f32f11070da9be3609f189a79f3390706d42587f46bb5" +dependencies = [ + "ahash 0.8.12", + "portable-atomic", +] + +[[package]] +name = "metrics-util" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4259040465c955f9f2f1a4a8a16dc46726169bca0f88e8fb2dbeced487c3e828" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.14.5", + "indexmap 2.10.0", + "metrics", + "num_cpus", + "ordered-float 4.6.0", + "quanta", + "radix_trie", + "sketches-ddsketch", +] + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -1793,7 +2017,7 @@ dependencies = [ "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", - "event-listener", + "event-listener 5.4.0", "futures-util", "loom", "parking_lot", @@ -1811,6 +2035,24 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.16", +] + +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1908,7 +2150,7 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" dependencies = [ - "hermit-abi 0.5.2", + "hermit-abi", "libc", ] @@ -1921,6 +2163,80 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.11.2" +source = "git+https://github.com/RelationalAI/arrow-rs.git?tag=v0.11.3-beta1#fa77acbd1e5e3acbf0824443b2c1d1df8609b457" +dependencies = [ + "async-trait", + "base64", + "bytes", + "chrono", + "futures", + "httparse", + "humantime", + "hyper", + "itertools", + "md-5", + "parking_lot", + "percent-encoding", + "quick-xml", + "rand 0.8.5", + "reqwest", + "ring", + "serde", + "serde_json", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + +[[package]] +name = "object_store_ffi" +version = "0.12.3" +source = "git+https://github.com/RelationalAI/object_store_ffi?rev=db0f6a3fe282e267a53c119c0aca4b5af341df3f#db0f6a3fe282e267a53c119c0aca4b5af341df3f" +dependencies = [ + "anyhow", + "async-channel", + "async-compression", + "async-trait", + "backoff", + "base64", + "bytes", + "chrono", + "crossbeam-queue", + "flate2", + "flume", + "futures-util", + "hickory-resolver", + "hyper", + "metrics", + "metrics-util", + "moka", + "object_store", + "once_cell", + "openssl", + "pin-project", + "quanta", + "rand 0.8.5", + "regex", + "reqwest", + "serde", + "serde_json", + "serde_path_to_error", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", + "url", + "uuid", + "walkdir", + "zeroize", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -1955,6 +2271,60 @@ dependencies = [ "uuid", ] +[[package]] +name = "openssl" +version = "0.10.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + +[[package]] +name = "openssl-src" +version = "300.5.2+3.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d270b79e2926f5150189d475bc7e9d2c69f9c4697b185fa917d5a32b792d21b4" +dependencies = [ + "cc", +] + +[[package]] +name = "openssl-sys" +version = "0.9.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571" +dependencies = [ + "cc", + "libc", + "openssl-src", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -1983,12 +2353,6 @@ dependencies = [ "hashbrown 0.14.5", ] -[[package]] -name = "os_str_bytes" -version = "6.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" - [[package]] name = "overload" version = "0.1.1" @@ -2071,6 +2435,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2163,6 +2547,21 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.1+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.37.5" @@ -2249,6 +2648,16 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -2308,13 +2717,22 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags", +] + [[package]] name = "redox_syscall" version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77" dependencies = [ - "bitflags 2.9.1", + "bitflags", ] [[package]] @@ -2435,6 +2853,8 @@ dependencies = [ "bytes", "futures-core", "futures-util", + "h2", + "hickory-resolver", "http", "http-body", "http-body-util", @@ -2443,10 +2863,12 @@ dependencies = [ "hyper-util", "js-sys", "log", + "once_cell", "percent-encoding", "pin-project-lite", "quinn", "rustls", + "rustls-native-certs", "rustls-pki-types", "serde", "serde_json", @@ -2466,6 +2888,12 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "resolv-conf" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95325155c684b1c89f7765e30bc1c42e4a6da51ca513615660cb8a62ef9a88e3" + [[package]] name = "ring" version = "0.17.14" @@ -2578,7 +3006,7 @@ version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" dependencies = [ - "bitflags 2.9.1", + "bitflags", "errno", "libc", "linux-raw-sys", @@ -2599,6 +3027,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pki-types" version = "1.12.0" @@ -2632,6 +3072,24 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "schannel" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "schemars" version = "0.9.0" @@ -2674,6 +3132,29 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "security-framework" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.26" @@ -2727,6 +3208,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59fab13f937fa393d08645bf3a84bdfe86e296747b506ada67bb15f10f218b2a" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -2834,6 +3325,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "sketches-ddsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" + [[package]] name = "slab" version = "0.4.10" @@ -2846,6 +3343,27 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "snafu" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e84b3f4eacbf3a1ce05eac6763b4d629d60cbc94d632e4092c54ade71f1e1a2" +dependencies = [ + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1c97747dbf44bb1ca44a561ece23508e99cb592e862f22222dcf42f51d1e451" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "snap" version = "1.1.1" @@ -2872,6 +3390,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -2884,12 +3411,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "strsim" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" - [[package]] name = "strsim" version = "0.11.1" @@ -2917,7 +3438,7 @@ version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" dependencies = [ - "heck 0.5.0", + "heck", "proc-macro2", "quote", "rustversion", @@ -2930,7 +3451,7 @@ version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7" dependencies = [ - "heck 0.5.0", + "heck", "proc-macro2", "quote", "syn 2.0.104", @@ -3009,21 +3530,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "termcolor" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" -dependencies = [ - "winapi-util", -] - -[[package]] -name = "textwrap" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c13547615a44dc9c452a8a534638acdf07120d4b6847c8178705da06306a3057" - [[package]] name = "thiserror" version = "1.0.69" @@ -3214,15 +3720,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "toml" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234" -dependencies = [ - "serde", -] - [[package]] name = "toml_datetime" version = "0.6.11" @@ -3261,7 +3758,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ - "bitflags 2.9.1", + "bitflags", "bytes", "futures-util", "http", @@ -3292,9 +3789,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "tracing-core" version = "0.1.34" @@ -3439,12 +3948,28 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -3582,6 +4107,12 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "widestring" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd7cf3379ca1aac9eea11fba24fd7e315d621f8dfe35c8d7d2be8b793726e07d" + [[package]] name = "winapi" version = "0.3.9" @@ -3715,6 +4246,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -3742,6 +4282,21 @@ dependencies = [ "windows-targets 0.53.3", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -3784,6 +4339,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -3796,6 +4357,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -3808,6 +4375,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -3832,6 +4405,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -3844,6 +4423,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -3856,6 +4441,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -3868,6 +4459,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -3889,13 +4486,23 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags 2.9.1", + "bitflags", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0db631d..962c6e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,23 +1,28 @@ [package] name = "iceberg_rust_ffi" -version = "0.1.0" +version = "0.2.0" edition = "2021" [lib] name = "iceberg_rust_ffi" crate-type = ["cdylib"] -[build-dependencies] -cbindgen = "0.26" +[features] +default = ["julia"] +julia = [] [dependencies] iceberg = "0.6.0" +object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "db0f6a3fe282e267a53c119c0aca4b5af341df3f", default-features = false } tokio = { version = "1.0", features = ["full"] } -libc = "0.2" futures = "0.3" +libc = "0.2" anyhow = "1.0" arrow-array = "55.2.0" arrow-ipc = "55.2.0" +tracing-subscriber = "0.3" +tracing = "0.1" +once_cell = "1.19" [dev-dependencies] tempfile = "3.0" diff --git a/Makefile b/Makefile index aa7bc28..8e51903 100644 --- a/Makefile +++ b/Makefile @@ -17,17 +17,14 @@ TARGET = local # Default target all: build test -# Generate C header -generate-header: +# Build the Rust library +build-lib: @if [ "$(TARGET)" = "local" ]; then \ - cargo build --release; \ + cargo build --release --no-default-features; \ else \ - cargo build --release --target $(TARGET); \ + cargo build --release --no-default-features --target $(TARGET); \ fi -# Build the Rust library and generate header -build-lib: generate-header - # Build the integration test build-test: build-lib $(CC) $(CFLAGS) -o $(TEST_NAME) $(TEST_SOURCE) $(LDFLAGS) @@ -67,7 +64,6 @@ clean-all: clean help: @echo "Available targets:" @echo " all - Build and run integration test" - @echo " generate-header - Generate C header file using cbindgen" @echo " build-lib - Build only the Rust library" @echo " build-test - Build the integration test (requires library)" @echo " build - Build everything" @@ -78,4 +74,4 @@ help: @echo " clean-all - Clean everything including target directory" @echo " help - Show this help message" -.PHONY: all generate-header build-lib build-test build test clean clean-all help stop-containers run-containers \ No newline at end of file +.PHONY: all build-lib build-test build test clean clean-all help stop-containers run-containers \ No newline at end of file diff --git a/build.rs b/build.rs deleted file mode 100644 index d2de405..0000000 --- a/build.rs +++ /dev/null @@ -1,21 +0,0 @@ -use std::env; -use std::path::PathBuf; - -fn main() { - let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); - let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); - - let config = cbindgen::Config::default(); - - cbindgen::Builder::new() - .with_crate(&crate_dir) - .with_config(config) - .generate() - .expect("Unable to generate bindings") - .write_to_file(out_dir.join("iceberg_rust_ffi.h")); - - // Note: We're using a manually created header file instead of the cbindgen-generated one - // The cbindgen output is available in the build output directory if needed for reference - println!("cargo:rerun-if-changed=src/lib.rs"); - println!("cargo:rerun-if-changed=include/iceberg_rust_ffi.h"); -} diff --git a/cbindgen.toml b/cbindgen.toml deleted file mode 100644 index a1d446e..0000000 --- a/cbindgen.toml +++ /dev/null @@ -1,33 +0,0 @@ -language = "C" -include_guard = "ICEBERG_RUST_FFI_H" -autogen_warning = "// This file is auto-generated by cbindgen. Do not edit manually." -tab_width = 4 -documentation = true -documentation_style = "doxy" -line_length = 100 -cpp_compat = false -no_includes = true - -[export] -prefix = "iceberg_" -include = ["IcebergTable", "IcebergScan", "ArrowBatch", "IcebergResult"] -rename = { - "IcebergTable" = "IcebergTable", - "IcebergScan" = "IcebergScan", - "ArrowBatch" = "ArrowBatch", - "IcebergResult" = "IcebergResult" -} - -[defines] -"target_os = \"macos\"" = "TARGET_OS_MACOS" -"target_os = \"linux\"" = "TARGET_OS_LINUX" -"target_os = \"windows\"" = "TARGET_OS_WINDOWS" - -[parse] -parse_deps = true -include = ["iceberg_rust_ffi"] -extra_bindings = [] - -[header] -prefix = "#ifndef ICEBERG_RUST_FFI_H\n#define ICEBERG_RUST_FFI_H\n\n#include \n#include \n#include \n\n#ifdef __cplusplus\nextern \"C\" {\n#endif\n\n" -suffix = "\n#ifdef __cplusplus\n}\n#endif\n\n#endif // ICEBERG_RUST_FFI_H\n" \ No newline at end of file diff --git a/include/iceberg_rust_ffi.h b/include/iceberg_rust_ffi.h index 7fb7a10..5cbed86 100644 --- a/include/iceberg_rust_ffi.h +++ b/include/iceberg_rust_ffi.h @@ -11,49 +11,101 @@ extern "C" { // Forward declarations typedef struct IcebergTable IcebergTable; -typedef struct IcebergScan IcebergScan; +typedef struct Context Context; + +// Configuration for iceberg runtime +typedef struct { + size_t n_threads; +} IcebergStaticConfig; + +// Result types +typedef enum { + CRESULT_OK = 0, + CRESULT_ERROR = 1 +} CResult; // Arrow batch as serialized bytes -typedef struct ArrowBatch { - const uint8_t* data; // Pointer to serialized Arrow IPC data - size_t length; // Length of the data in bytes - void* rust_ptr; // Internal Rust pointer for memory management +typedef struct { + const uint8_t* data; + size_t length; + void* rust_ptr; } ArrowBatch; -typedef enum { - ICEBERG_OK = 0, - ICEBERG_ERROR = -1, - ICEBERG_NULL_POINTER = -2, - ICEBERG_IO_ERROR = -3, - ICEBERG_INVALID_TABLE = -4, - ICEBERG_END_OF_STREAM = -5 -} IcebergResult; - -// Table operations -IcebergResult iceberg_table_open(const char* table_path, const char* metadata_path, IcebergTable** table); +// Response structures for async operations +typedef struct { + CResult result; + IcebergTable* table; + char* error_message; + const Context* context; +} IcebergTableResponse; + +typedef struct IcebergScan IcebergScan; + +typedef struct { + CResult result; + IcebergScan* scan; + char* error_message; + const Context* context; +} IcebergScanResponse; + +typedef struct { + void *stream; +} IcebergArrowStream; + +typedef struct { + CResult result; + IcebergArrowStream* stream; + char* error_message; + const Context* context; +} IcebergArrowStreamResponse; + +typedef struct { + CResult result; + char* error_message; + const Context* context; +} IcebergResponse; + +typedef struct { + CResult result; + ArrowBatch* batch; + char* error_message; + const Context* context; +} IcebergBatchResponse; + +// Callback types +typedef int (*PanicCallback)(void); +typedef int (*ResultCallback)(const void* task); + +// Runtime initialization +CResult iceberg_init_runtime(IcebergStaticConfig config, PanicCallback panic_callback, ResultCallback result_callback); + +// Async table operations +CResult iceberg_table_open(const char* table_path, const char* metadata_path, IcebergTableResponse* response, const void* handle); void iceberg_table_free(IcebergTable* table); -// Scan operations -IcebergResult iceberg_table_scan(IcebergTable* table, IcebergScan** scan); -IcebergResult iceberg_scan_select_columns(IcebergScan* scan, const char** column_names, size_t num_columns); -void iceberg_scan_free(IcebergScan* scan); +// Synchronous scan creation +IcebergScan* iceberg_new_scan(IcebergTable* table); +int iceberg_select_columns(IcebergScan** scan, const char** column_names, size_t num_columns); +int iceberg_scan_build(IcebergScan** scan); +int iceberg_scan_with_data_file_concurrency_limit(IcebergScan** scan, size_t n); +int iceberg_scan_with_manifest_entry_concurrency_limit(IcebergScan** scan, size_t n); +int iceberg_scan_with_batch_size(IcebergScan** scan, size_t n); +void iceberg_scan_free(IcebergScan** scan); -// Arrow batch operations -IcebergResult iceberg_scan_next_batch(IcebergScan* scan, ArrowBatch** batch); +// Async streaming API +CResult iceberg_arrow_stream(IcebergScan* scan, IcebergArrowStreamResponse* response, const void* handle); +CResult iceberg_next_batch(IcebergArrowStream* stream, IcebergBatchResponse* response, const void* handle); +void iceberg_arrow_stream_free(IcebergArrowStream* stream); void iceberg_arrow_batch_free(ArrowBatch* batch); -// Error handling -const char* iceberg_error_message(); - -// Function pointer typedefs for dynamic loading -typedef IcebergResult (*iceberg_table_open_func_t)(const char* table_path, const char* metadata_path, IcebergTable** table); -typedef void (*iceberg_table_free_func_t)(IcebergTable* table); -typedef IcebergResult (*iceberg_table_scan_func_t)(IcebergTable* table, IcebergScan** scan); -typedef IcebergResult (*iceberg_scan_select_columns_func_t)(IcebergScan* scan, const char** column_names, size_t num_columns); -typedef void (*iceberg_scan_free_func_t)(IcebergScan* scan); -typedef IcebergResult (*iceberg_scan_next_batch_func_t)(IcebergScan* scan, ArrowBatch** batch); -typedef void (*iceberg_arrow_batch_free_func_t)(ArrowBatch* batch); -typedef const char* (*iceberg_error_message_func_t)(); +// Utility functions +CResult iceberg_destroy_cstring(char* string); +const char* iceberg_current_metrics(void); + +// Context management functions for cancellation support +CResult iceberg_cancel_context(const Context* ctx); +CResult iceberg_destroy_context(const Context* ctx); + #ifdef __cplusplus } diff --git a/run_integration_test.sh b/run_integration_test.sh index 1ffca99..c881531 100755 --- a/run_integration_test.sh +++ b/run_integration_test.sh @@ -46,9 +46,9 @@ if [ -f ".env" ]; then set +a fi -# Step 1: Build the Rust library +# Step 1: Build the Rust library (without julia feature for standalone C integration) print_status "Building Rust library..." -if cargo build; then +if cargo build --no-default-features; then print_success "Rust library built successfully" else print_error "Failed to build Rust library" @@ -74,7 +74,7 @@ print_status "Using library from: $LIB_PATH" # Step 2: Build the integration test print_status "Building integration test..." -if gcc -o integration_test tests/integration_test.c -Iinclude -L"$LIB_PATH" -liceberg_rust_ffi -lpthread -ldl -lm; then +if gcc -Wall -Wextra -o integration_test tests/integration_test.c -Iinclude -L"$LIB_PATH" -liceberg_rust_ffi -lpthread -ldl -lm; then print_success "Integration test built successfully" else print_error "Failed to build integration test" @@ -84,7 +84,24 @@ fi # Step 3: Run the integration test print_status "Running integration test..." echo "==========================================" -if ./integration_test; then + +# Determine the exact library filename +LIBRARY="" +if [ -f "$LIB_PATH/libiceberg_rust_ffi.dylib" ]; then + LIBRARY="$LIB_PATH/libiceberg_rust_ffi.dylib" +elif [ -f "$LIB_PATH/libiceberg_rust_ffi.so" ]; then + LIBRARY="$LIB_PATH/libiceberg_rust_ffi.so" +else + print_error "Could not find dynamic library" + exit 1 +fi + +print_status "Using library: $LIBRARY" +# Pass through RUST_LOG environment variable if set +if [ -n "$RUST_LOG" ]; then + export RUST_LOG="$RUST_LOG" +fi +if ./integration_test "$LIBRARY"; then echo "==========================================" print_success "Integration test completed successfully!" else diff --git a/src/lib.rs b/src/lib.rs index ce10979..85d0187 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,90 +1,221 @@ -use std::ffi::{CStr, CString}; -use std::os::raw::c_char; +use futures::TryStreamExt; +use std::ffi::{c_char, c_void, CStr}; use std::ptr; -use std::sync::Mutex; -use std::sync::OnceLock; +use tokio::sync::Mutex as AsyncMutex; use anyhow::Result; use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; -use futures::stream::StreamExt; use iceberg::io::FileIOBuilder; -use iceberg::table::StaticTable; +use iceberg::scan::{TableScan, TableScanBuilder}; +use iceberg::table::{StaticTable, Table}; use iceberg::TableIdent; -use std::env; -use tokio::runtime::Runtime; -// cbindgen annotations -#[allow(non_camel_case_types)] -#[allow(non_snake_case)] - -// Internal structures for Rust implementation -struct IcebergTableInternal { - table: iceberg::table::Table, +// Import from object_store_ffi +use object_store_ffi::{ + cancel_context, current_metrics, destroy_context, destroy_cstring, export_runtime_op, + with_cancellation, CResult, Context, NotifyGuard, RawResponse, ResponseGuard, ResultCallback, + RESULT_CB, RT, +}; + +// We use `jl_adopt_thread` to ensure Rust can call into Julia when notifying +// the Base.Event that is waiting for the Rust result. +// Note that this will be linked in from the Julia process, we do not try +// to link it while building this Rust lib. +#[cfg(feature = "julia")] +extern "C" { + fn jl_adopt_thread() -> i32; + fn jl_gc_safe_enter() -> i32; + fn jl_gc_disable_finalizers_internal() -> c_void; } -struct IcebergScanInternal { - table: Option, - columns: Option>, - stream: Option>>>, +// Simple response type for operations that only need success/failure status +#[repr(C)] +pub struct IcebergResponse { + result: CResult, + error_message: *mut c_char, + context: *const Context, } -// Global Tokio runtime using OnceLock for thread safety -// TODO: Might want to share tokio runtime between here and object_store_ffi.jl, e.g., -// by passing object store in and using its runtime. -static RUNTIME: OnceLock = OnceLock::new(); +unsafe impl Send for IcebergResponse {} -fn get_runtime() -> &'static Runtime { - RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio runtime")) -} +impl RawResponse for IcebergResponse { + type Payload = (); + + fn result_mut(&mut self) -> &mut CResult { + &mut self.result + } + + fn context_mut(&mut self) -> &mut *const Context { + &mut self.context + } + + fn error_message_mut(&mut self) -> &mut *mut c_char { + &mut self.error_message + } -// Thread-local error storage -thread_local! { - static LAST_ERROR: std::cell::RefCell> = std::cell::RefCell::new(None); + fn set_payload(&mut self, _payload: Option) { + // No payload for simple response + } } -fn set_error(error: String) { - LAST_ERROR.with(|e| { - *e.borrow_mut() = Some(error); - }); +// Callback types for Julia integration +type PanicCallback = unsafe extern "C" fn() -> i32; + +// Simple config for iceberg - only what we need +#[derive(Copy, Clone)] +#[repr(C)] +pub struct IcebergStaticConfig { + n_threads: usize, } -fn clear_error() { - LAST_ERROR.with(|e| { - *e.borrow_mut() = None; - }); +impl Default for IcebergStaticConfig { + fn default() -> Self { + IcebergStaticConfig { + n_threads: 0, // 0 means use tokio's default + } + } } -// C API structures +// Direct structures - no opaque wrappers #[repr(C)] pub struct IcebergTable { - _private: [u8; 0], // Opaque type for C + pub table: Table, } #[repr(C)] pub struct IcebergScan { - _private: [u8; 0], // Opaque type for C + pub builder: Option>, + pub scan: Option, +} + +unsafe impl Send for IcebergScan {} + +// Stream wrapper for FFI - using async mutex to avoid blocking calls +#[repr(C)] +pub struct IcebergArrowStream { + pub stream: + AsyncMutex>>, } +unsafe impl Send for IcebergArrowStream {} + #[repr(C)] pub struct ArrowBatch { pub data: *const u8, pub length: usize, - pub rust_ptr: *mut std::ffi::c_void, + pub rust_ptr: *mut c_void, +} + +// Response types for async operations +#[repr(C)] +pub struct IcebergTableResponse { + result: CResult, + table: *mut IcebergTable, + error_message: *mut c_char, + context: *const Context, +} + +unsafe impl Send for IcebergTableResponse {} + +impl RawResponse for IcebergTableResponse { + type Payload = IcebergTable; + fn result_mut(&mut self) -> &mut CResult { + &mut self.result + } + fn context_mut(&mut self) -> &mut *const Context { + &mut self.context + } + fn error_message_mut(&mut self) -> &mut *mut c_char { + &mut self.error_message + } + fn set_payload(&mut self, payload: Option) { + match payload { + Some(table) => { + let table_ptr = Box::into_raw(Box::new(table)); + self.table = table_ptr; + } + None => self.table = ptr::null_mut(), + } + } +} + +#[repr(C)] +pub struct IcebergArrowStreamResponse { + result: CResult, + stream: *mut IcebergArrowStream, + error_message: *mut c_char, + context: *const Context, +} + +unsafe impl Send for IcebergArrowStreamResponse {} + +impl RawResponse for IcebergArrowStreamResponse { + type Payload = IcebergArrowStream; + + fn result_mut(&mut self) -> &mut CResult { + &mut self.result + } + + fn context_mut(&mut self) -> &mut *const Context { + &mut self.context + } + + fn error_message_mut(&mut self) -> &mut *mut c_char { + &mut self.error_message + } + + fn set_payload(&mut self, payload: Option) { + match payload { + Some(stream) => { + self.stream = Box::into_raw(Box::new(stream)); + } + None => self.stream = ptr::null_mut(), + } + } } #[repr(C)] -pub enum IcebergResult { - IcebergOk = 0, - IcebergError = -1, - IcebergNullPointer = -2, - IcebergIoError = -3, - IcebergInvalidTable = -4, - IcebergEndOfStream = -5, +pub struct IcebergBatchResponse { + result: CResult, + batch: *mut ArrowBatch, + error_message: *mut c_char, + context: *const Context, +} + +unsafe impl Send for IcebergBatchResponse {} + +impl RawResponse for IcebergBatchResponse { + type Payload = Option; + fn result_mut(&mut self) -> &mut CResult { + &mut self.result + } + fn context_mut(&mut self) -> &mut *const Context { + &mut self.context + } + fn error_message_mut(&mut self) -> &mut *mut c_char { + &mut self.error_message + } + fn set_payload(&mut self, payload: Option) { + match payload.flatten() { + Some(batch) => { + let arrow_batch = serialize_record_batch(batch); + match arrow_batch { + Ok(arrow_batch) => { + self.batch = Box::into_raw(Box::new(arrow_batch)); + } + Err(_) => { + self.batch = ptr::null_mut(); + } + } + } + None => self.batch = ptr::null_mut(), + } + } } // Helper function to create ArrowBatch from RecordBatch -// TODO: This should be zero-copy... +// TODO: Switch to zero-copy once Arrow.jl supports C API. fn serialize_record_batch(batch: RecordBatch) -> Result { let buffer = Vec::new(); let mut stream_writer = StreamWriter::try_new(buffer, &batch.schema())?; @@ -95,7 +226,7 @@ fn serialize_record_batch(batch: RecordBatch) -> Result { let boxed_data = Box::new(serialized_data); let data_ptr = boxed_data.as_ptr(); let length = boxed_data.len(); - let rust_ptr = Box::into_raw(boxed_data) as *mut std::ffi::c_void; + let rust_ptr = Box::into_raw(boxed_data) as *mut c_void; Ok(ArrowBatch { data: data_ptr, @@ -104,293 +235,367 @@ fn serialize_record_batch(batch: RecordBatch) -> Result { }) } -// C API functions +// Initialize runtime - configure RT and RESULT_CB directly #[no_mangle] -pub extern "C" fn iceberg_table_open( - table_path: *const c_char, - metadata_path: *const c_char, - table: *mut *mut IcebergTable, -) -> IcebergResult { - if table_path.is_null() || metadata_path.is_null() || table.is_null() { - set_error("Null pointer provided".to_string()); - return IcebergResult::IcebergNullPointer; - } - - clear_error(); - - let path_str = unsafe { - match CStr::from_ptr(table_path).to_str() { - Ok(s) => s, - Err(e) => { - set_error(format!("Invalid UTF-8 in table path: {}", e)); - return IcebergResult::IcebergError; - } - } - }; +pub extern "C" fn iceberg_init_runtime( + config: IcebergStaticConfig, + panic_callback: PanicCallback, + result_callback: ResultCallback, +) -> CResult { + // Set the result callback + if let Err(_) = RESULT_CB.set(result_callback) { + return CResult::Error; // Already initialized + } - let metadata_path_str = unsafe { - match CStr::from_ptr(metadata_path).to_str() { - Ok(s) => s, - Err(e) => { - set_error(format!("Invalid UTF-8 in metadata path: {}", e)); - return IcebergResult::IcebergError; - } + // Set up panic hook + let prev = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + prev(info); + unsafe { panic_callback() }; + })); + + // Set up logging if not already configured + if std::env::var("RUST_LOG").is_err() { + unsafe { std::env::set_var("RUST_LOG", "iceberg_rust_ffi=warn,iceberg=warn") } + } + + // Initialize tracing subscriber + let _ = tracing_subscriber::fmt::try_init(); + + // Build tokio runtime + let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); + rt_builder.enable_all(); + + // Configure Julia thread adoption for Julia integration + rt_builder.on_thread_start(|| { + #[cfg(feature = "julia")] + { + unsafe { jl_adopt_thread() }; + unsafe { jl_gc_safe_enter() }; + unsafe { jl_gc_disable_finalizers_internal() }; } + }); + + if config.n_threads > 0 { + rt_builder.worker_threads(config.n_threads); + } + + let runtime = match rt_builder.build() { + Ok(rt) => rt, + Err(_) => return CResult::Error, }; - // TODO: Perhaps we should have full asynchronicity that includes the caller code (e.g. Julia) instead of blocking here. - let result: Result = get_runtime().block_on(async { - // println!("DEBUG: Table path: {}", path_str); - // println!("DEBUG: Metadata path: {}", metadata_path_str); + if RT.set(runtime).is_err() { + return CResult::Error; + } + + CResult::Ok +} + +// Use export_runtime_op! macro for table opening +export_runtime_op!( + iceberg_table_open, + IcebergTableResponse, + || { + let table_path_str = unsafe { + CStr::from_ptr(table_path).to_str() + .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in table path: {}", e))? + }; + let metadata_path_str = unsafe { + CStr::from_ptr(metadata_path).to_str() + .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in metadata path: {}", e))? + }; + Ok((table_path_str.to_string(), metadata_path_str.to_string())) + }, + paths, + async { + let (table_path_str, metadata_path_str) = paths; - // Construct the full S3 path by combining table_path and metadata_path + // Construct the full metadata path let full_metadata_path = if metadata_path_str.starts_with('/') { - // If metadata_path starts with /, it's absolute, so use it as is - metadata_path_str.to_string() + metadata_path_str } else { - // Otherwise, combine table_path with metadata_path - let table_path_trimmed = path_str.trim_end_matches('/'); + let table_path_trimmed = table_path_str.trim_end_matches('/'); let metadata_path_trimmed = metadata_path_str.trim_start_matches('/'); format!("{}/{}", table_path_trimmed, metadata_path_trimmed) }; - // println!("DEBUG: Full metadata file path: {}", full_metadata_path); - - let _ = env::var("AWS_ACCESS_KEY_ID").expect("AWS_ACCESS_KEY_ID must be set"); - // Create file IO for S3 let file_io = FileIOBuilder::new("s3").build()?; // Create table identifier let table_ident = TableIdent::from_strs(["default", "table"])?; + // Load the static table + tracing::info!("Loading static table from metadata path: {}", full_metadata_path); let static_table = StaticTable::from_metadata_file(&full_metadata_path, table_ident, file_io).await?; - let iceberg_table = static_table.into_table(); - - Ok(iceberg_table) - }); - - match result { - Ok(iceberg_table) => { - let table_ptr = Box::into_raw(Box::new(IcebergTableInternal { - table: iceberg_table, - })); - unsafe { - *table = table_ptr as *mut IcebergTable; - } - IcebergResult::IcebergOk - } - Err(e) => { - set_error(format!("Failed to open table: {}", e)); - IcebergResult::IcebergError - } - } -} - -#[no_mangle] -pub extern "C" fn iceberg_table_free(table: *mut IcebergTable) { - if !table.is_null() { - unsafe { - let _ = Box::from_raw(table as *mut IcebergTableInternal); - } - } -} + tracing::info!("Successfully loaded static table, converting to table"); + Ok::(IcebergTable { table: static_table.into_table() }) + }, + table_path: *const c_char, + metadata_path: *const c_char +); #[no_mangle] -pub extern "C" fn iceberg_table_scan( - table: *mut IcebergTable, - scan: *mut *mut IcebergScan, -) -> IcebergResult { - if table.is_null() || scan.is_null() { - set_error("Null pointer provided".to_string()); - return IcebergResult::IcebergNullPointer; +pub extern "C" fn iceberg_new_scan(table: *mut IcebergTable) -> *mut IcebergScan { + if table.is_null() { + return ptr::null_mut(); } - - clear_error(); - - let table_ref = unsafe { &*(table as *const IcebergTableInternal) }; - - let scan_ptr = Box::into_raw(Box::new(IcebergScanInternal { - table: Some(table_ref.table.clone()), - columns: None, - stream: None, + let table_ref = unsafe { &*table }; + let scan_builder = table_ref.table.scan(); + return Box::into_raw(Box::new(IcebergScan { + builder: Some(scan_builder), + scan: None, })); - - unsafe { - *scan = scan_ptr as *mut IcebergScan; - } - - IcebergResult::IcebergOk } #[no_mangle] -pub extern "C" fn iceberg_scan_select_columns( - scan: *mut IcebergScan, +pub extern "C" fn iceberg_select_columns( + scan: &mut *mut IcebergScan, column_names: *const *const c_char, num_columns: usize, -) -> IcebergResult { - if scan.is_null() || column_names.is_null() { - set_error("Null pointer provided".to_string()); - return IcebergResult::IcebergNullPointer; +) -> CResult { + if scan.is_null() || (*scan).is_null() || column_names.is_null() { + return CResult::Error; } - clear_error(); - - let scan_ref = unsafe { &mut *(scan as *mut IcebergScanInternal) }; - let mut columns = Vec::new(); for i in 0..num_columns { let col_ptr = unsafe { *column_names.add(i) }; if col_ptr.is_null() { - set_error("Null column name pointer".to_string()); - return IcebergResult::IcebergNullPointer; + return CResult::Error; } let col_str = unsafe { match CStr::from_ptr(col_ptr).to_str() { Ok(s) => s, - Err(e) => { - set_error(format!("Invalid UTF-8 in column name: {}", e)); - return IcebergResult::IcebergError; - } + Err(_) => return CResult::Error, } }; - columns.push(col_str.to_string()); } - scan_ref.columns = Some(columns); + let scan_ref = unsafe { Box::from_raw(*scan) }; + + if scan_ref.builder.is_none() { + return CResult::Error; + } + *scan = Box::into_raw(Box::new(IcebergScan { + builder: scan_ref.builder.map(|b| b.select(columns)), + scan: scan_ref.scan, + })); - IcebergResult::IcebergOk + return CResult::Ok; } #[no_mangle] -pub extern "C" fn iceberg_scan_free(scan: *mut IcebergScan) { - if !scan.is_null() { - unsafe { - let _ = Box::from_raw(scan as *mut IcebergScanInternal); - } +pub extern "C" fn iceberg_scan_with_data_file_concurrency_limit( + scan: &mut *mut IcebergScan, + n: usize, +) -> CResult { + if scan.is_null() || (*scan).is_null() { + return CResult::Error; } + let scan_ref = unsafe { Box::from_raw(*scan) }; + + if scan_ref.builder.is_none() { + return CResult::Error; + } + + *scan = Box::into_raw(Box::new(IcebergScan { + builder: scan_ref + .builder + .map(|b| b.with_data_file_concurrency_limit(n)), + scan: scan_ref.scan, + })); + + return CResult::Ok; } #[no_mangle] -pub extern "C" fn iceberg_scan_next_batch( - scan: *mut IcebergScan, - batch: *mut *mut ArrowBatch, -) -> IcebergResult { - if scan.is_null() || batch.is_null() { - set_error("Null pointer provided".to_string()); - return IcebergResult::IcebergNullPointer; +pub extern "C" fn iceberg_scan_with_manifest_entry_concurrency_limit( + scan: &mut *mut IcebergScan, + n: usize, +) -> CResult { + if scan.is_null() || (*scan).is_null() { + return CResult::Error; } + let scan_ref = unsafe { Box::from_raw(*scan) }; - clear_error(); + if scan_ref.builder.is_none() { + return CResult::Error; + } - let scan_ref = unsafe { &mut *(scan as *mut IcebergScanInternal) }; + *scan = Box::into_raw(Box::new(IcebergScan { + builder: scan_ref + .builder + .map(|b| b.with_manifest_entry_concurrency_limit(n)), + scan: scan_ref.scan, + })); - // Initialize stream if not already done - if scan_ref.stream.is_none() { - if let Some(table) = &scan_ref.table { - let columns = scan_ref.columns.clone(); - let stream_result = get_runtime().block_on(async { - let mut scan_builder = table.scan(); + return CResult::Ok; +} - if let Some(cols) = columns { - scan_builder = scan_builder.select(cols); - } +#[no_mangle] +pub extern "C" fn iceberg_scan_with_batch_size(scan: &mut *mut IcebergScan, n: usize) -> CResult { + if scan.is_null() || (*scan).is_null() { + return CResult::Error; + } + let scan_ref = unsafe { Box::from_raw(*scan) }; - match scan_builder.build() { - Ok(table_scan) => match table_scan.to_arrow().await { - Ok(stream) => Ok(stream), - Err(e) => { - set_error(format!("Failed to create arrow stream: {}", e)); - Err(e) - } - }, - Err(e) => { - set_error(format!("Failed to build scan: {}", e)); - Err(e) - } - } - }); + if scan_ref.builder.is_none() { + return CResult::Error; + } - match stream_result { - Ok(stream) => { - scan_ref.stream = Some(Mutex::new(stream)); - } - Err(_) => { - return IcebergResult::IcebergError; - } - } - } else { - set_error("Table not available".to_string()); - return IcebergResult::IcebergError; + *scan = Box::into_raw(Box::new(IcebergScan { + builder: scan_ref.builder.map(|b| b.with_batch_size(Some(n))), + scan: scan_ref.scan, + })); + + return CResult::Ok; +} + +#[no_mangle] +pub extern "C" fn iceberg_scan_build(scan: &mut *mut IcebergScan) -> CResult { + if scan.is_null() || (*scan).is_null() { + return CResult::Error; + } + let scan_ref = unsafe { Box::from_raw(*scan) }; + if scan_ref.builder.is_none() { + return CResult::Error; + } + let builder = scan_ref.builder.unwrap(); + + match builder.build() { + Ok(table_scan) => { + *scan = Box::into_raw(Box::new(IcebergScan { + builder: None, + scan: Some(table_scan), + })); + CResult::Ok } + Err(_) => CResult::Error, } +} - // Get next batch from stream - if let Some(stream_mutex) = &scan_ref.stream { - let result = get_runtime().block_on(async { - let mut stream = stream_mutex.lock().unwrap(); - stream.next().await - }); +// Async function to initialize stream from a table scan without getting first batch +export_runtime_op!( + iceberg_arrow_stream, + IcebergArrowStreamResponse, + || { + if scan.is_null() { + return Err(anyhow::anyhow!("Null scan pointer provided")); + } + let scan_ref = unsafe { &(*scan).scan }; + if scan_ref.is_none() { + return Err(anyhow::anyhow!("Scan not initialized")); + } - match result { - Some(Ok(record_batch)) => match serialize_record_batch(record_batch) { - Ok(arrow_batch) => { - let batch_ptr = Box::into_raw(Box::new(arrow_batch)); - unsafe { - *batch = batch_ptr; - } - IcebergResult::IcebergOk - } - Err(e) => { - set_error(format!("Failed to serialize batch: {}", e)); - IcebergResult::IcebergError - } - }, - Some(Err(e)) => { - set_error(format!("Error reading batch: {}", e)); - IcebergResult::IcebergError + return Ok(scan_ref.as_ref().unwrap()); + }, + scan_ref, + async { + let stream = scan_ref.to_arrow().await?; + Ok::(IcebergArrowStream { + stream: AsyncMutex::new(stream), + }) + }, + scan: *mut IcebergScan +); + +// Async function to get next batch from existing stream +export_runtime_op!( + iceberg_next_batch, + IcebergBatchResponse, + || { + if stream.is_null() { + return Err(anyhow::anyhow!("Null stream pointer provided")); + } + let stream_ref = unsafe { &*stream }; + Ok(stream_ref) + }, + stream_ref, + async { + let mut stream_guard = stream_ref.stream.lock().await; + + match stream_guard.try_next().await { + Ok(Some(record_batch)) => { + Ok(Some(record_batch)) } - None => { + Ok(None) => { // End of stream - unsafe { - *batch = ptr::null_mut(); - } - IcebergResult::IcebergEndOfStream + tracing::debug!("End of stream reached"); + Ok(None) } + Err(e) => Err(anyhow::anyhow!("Error reading batch: {}", e)), + } + }, + stream: *mut IcebergArrowStream +); + +// Synchronous operations +#[no_mangle] +pub extern "C" fn iceberg_table_free(table: *mut IcebergTable) { + if !table.is_null() { + unsafe { + let _ = Box::from_raw(table); } - } else { - set_error("Stream not initialized".to_string()); - IcebergResult::IcebergError } } #[no_mangle] -pub extern "C" fn iceberg_arrow_batch_free(batch: *mut ArrowBatch) { - if !batch.is_null() { +pub extern "C" fn iceberg_scan_free(scan: &mut *mut IcebergScan) { + if !scan.is_null() { unsafe { - let batch_ref = Box::from_raw(batch); - if !batch_ref.rust_ptr.is_null() { - let _ = Box::from_raw(batch_ref.rust_ptr as *mut Vec); - } + let _ = Box::from_raw(*scan); + *scan = ptr::null_mut(); } } } #[no_mangle] -pub extern "C" fn iceberg_error_message() -> *const c_char { - LAST_ERROR.with(|e| { - if let Some(ref error) = *e.borrow() { - match CString::new(error.clone()) { - Ok(cstring) => cstring.into_raw(), - Err(_) => ptr::null(), - } - } else { - ptr::null() +pub extern "C" fn iceberg_arrow_stream_free(stream: *mut IcebergArrowStream) { + if !stream.is_null() { + unsafe { + let _ = Box::from_raw(stream); } - }) + } +} + +#[no_mangle] +pub extern "C" fn iceberg_arrow_batch_free(batch: *mut ArrowBatch) { + if batch.is_null() { + return; + } + + unsafe { + let batch_ref = Box::from_raw(batch); + if !batch_ref.rust_ptr.is_null() { + let _ = Box::from_raw(batch_ref.rust_ptr as *mut Vec); + } + } +} +// Re-export object_store_ffi utilities +#[no_mangle] +pub extern "C" fn iceberg_destroy_cstring(string: *mut c_char) -> CResult { + destroy_cstring(string) +} + +#[no_mangle] +pub extern "C" fn iceberg_current_metrics() -> *const c_char { + current_metrics() +} + +// Re-export context management functions for cancellation support +#[no_mangle] +pub extern "C" fn iceberg_cancel_context(ctx_ptr: *const Context) -> CResult { + cancel_context(ctx_ptr) +} + +#[no_mangle] +pub extern "C" fn iceberg_destroy_context(ctx_ptr: *const Context) -> CResult { + destroy_context(ctx_ptr) } diff --git a/tests/integration_test.c b/tests/integration_test.c index 65e36cc..86bfbd9 100644 --- a/tests/integration_test.c +++ b/tests/integration_test.c @@ -2,22 +2,46 @@ #include #include #include - -// Global function pointers -static iceberg_table_open_func_t iceberg_table_open_func = NULL; -static iceberg_table_free_func_t iceberg_table_free_func = NULL; -static iceberg_table_scan_func_t iceberg_table_scan_func = NULL; -static iceberg_scan_select_columns_func_t iceberg_scan_select_columns_func = NULL; -static iceberg_scan_free_func_t iceberg_scan_free_func = NULL; -static iceberg_scan_next_batch_func_t iceberg_scan_next_batch_func = NULL; -static iceberg_arrow_batch_free_func_t iceberg_arrow_batch_free_func = NULL; -static iceberg_error_message_func_t iceberg_error_message_func = NULL; +#include +#include +#include +#include + +// Global function pointers for new async API +static int (*iceberg_init_runtime_func)(IcebergStaticConfig config, int (*panic_callback)(void), int (*result_callback)(const void*)) = NULL; +static int (*iceberg_table_open_func)(const char*, const char*, IcebergTableResponse*, const void*) = NULL; +static IcebergScan* (*iceberg_new_scan_func)(IcebergTable*) = NULL; +static int (*iceberg_scan_build_func)(IcebergScan**) = NULL; +static void (*iceberg_scan_free_func)(IcebergScan**) = NULL; +static int (*iceberg_arrow_stream_func)(IcebergScan*, IcebergArrowStreamResponse*, const void*) = NULL; +static int (*iceberg_next_batch_func)(IcebergArrowStream*, IcebergBatchResponse*, const void*) = NULL; +static void (*iceberg_table_free_func)(IcebergTable*) = NULL; +static void (*iceberg_arrow_stream_free_func)(IcebergArrowStream*) = NULL; +static void (*iceberg_arrow_batch_free_func)(ArrowBatch*) = NULL; +static int (*iceberg_destroy_cstring_func)(char*) = NULL; +static int (*iceberg_cancel_context_func)(const void*) = NULL; +static int (*iceberg_destroy_context_func)(const void*) = NULL; // Library handle static void* lib_handle = NULL; +// Callback implementations +static int panic_callback(void) { + printf("🚨 Rust panic occurred!\n"); + return 1; +} + +volatile int async_completed = 0; + +static int result_callback(const void* task) { + (void)task; // Suppress unused parameter warning + // Signal that async operation completed + async_completed = 1; + return 0; +} + // Function to load the library and resolve symbols -int load_iceberg_library(const char* library_path) { +static int load_iceberg_library(const char* library_path) { printf("Loading Iceberg C API library from %s...\n", library_path); // Try to open the dynamic library @@ -32,52 +56,81 @@ int load_iceberg_library(const char* library_path) { // Clear any existing error dlerror(); - // Resolve function symbols - iceberg_table_open_func = (iceberg_table_open_func_t)dlsym(lib_handle, "iceberg_table_open"); + // Resolve function symbols for new async API + iceberg_init_runtime_func = (int (*)(IcebergStaticConfig, int (*)(void), int (*)(const void*)))dlsym(lib_handle, "iceberg_init_runtime"); + if (!iceberg_init_runtime_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_init_runtime: %s\n", dlerror()); + return 0; + } + + iceberg_table_open_func = (int (*)(const char*, const char*, IcebergTableResponse*, const void*))dlsym(lib_handle, "iceberg_table_open"); if (!iceberg_table_open_func) { fprintf(stderr, "❌ Failed to resolve iceberg_table_open: %s\n", dlerror()); return 0; } - iceberg_table_free_func = (iceberg_table_free_func_t)dlsym(lib_handle, "iceberg_table_free"); - if (!iceberg_table_free_func) { - fprintf(stderr, "❌ Failed to resolve iceberg_table_free: %s\n", dlerror()); + iceberg_new_scan_func = (IcebergScan* (*)(IcebergTable*))dlsym(lib_handle, "iceberg_new_scan"); + if (!iceberg_new_scan_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_new_scan: %s\n", dlerror()); return 0; } - iceberg_table_scan_func = (iceberg_table_scan_func_t)dlsym(lib_handle, "iceberg_table_scan"); - if (!iceberg_table_scan_func) { - fprintf(stderr, "❌ Failed to resolve iceberg_table_scan: %s\n", dlerror()); + iceberg_scan_build_func = (int (*)(IcebergScan**))dlsym(lib_handle, "iceberg_scan_build"); + if (!iceberg_scan_build_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_scan_build: %s\n", dlerror()); return 0; } - iceberg_scan_select_columns_func = (iceberg_scan_select_columns_func_t)dlsym(lib_handle, "iceberg_scan_select_columns"); - if (!iceberg_scan_select_columns_func) { - fprintf(stderr, "❌ Failed to resolve iceberg_scan_select_columns: %s\n", dlerror()); + iceberg_arrow_stream_func = (int (*)(IcebergScan*, IcebergArrowStreamResponse*, const void*))dlsym(lib_handle, "iceberg_arrow_stream"); + if (!iceberg_arrow_stream_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_arrow_stream: %s\n", dlerror()); + return 0; + } + iceberg_next_batch_func = (int (*)(IcebergArrowStream*, IcebergBatchResponse*, const void*))dlsym(lib_handle, "iceberg_next_batch"); + if (!iceberg_next_batch_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_next_batch: %s\n", dlerror()); return 0; } - iceberg_scan_free_func = (iceberg_scan_free_func_t)dlsym(lib_handle, "iceberg_scan_free"); + iceberg_table_free_func = (void (*)(IcebergTable*))dlsym(lib_handle, "iceberg_table_free"); + if (!iceberg_table_free_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_table_free: %s\n", dlerror()); + return 0; + } + + iceberg_scan_free_func = (void (*)(IcebergScan**))dlsym(lib_handle, "iceberg_scan_free"); if (!iceberg_scan_free_func) { fprintf(stderr, "❌ Failed to resolve iceberg_scan_free: %s\n", dlerror()); return 0; } - iceberg_scan_next_batch_func = (iceberg_scan_next_batch_func_t)dlsym(lib_handle, "iceberg_scan_next_batch"); - if (!iceberg_scan_next_batch_func) { - fprintf(stderr, "❌ Failed to resolve iceberg_scan_next_batch: %s\n", dlerror()); + iceberg_arrow_stream_free_func = (void (*)(IcebergArrowStream*))dlsym(lib_handle, "iceberg_arrow_stream_free"); + if (!iceberg_arrow_stream_free_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_arrow_stream_free: %s\n", dlerror()); return 0; } - iceberg_arrow_batch_free_func = (iceberg_arrow_batch_free_func_t)dlsym(lib_handle, "iceberg_arrow_batch_free"); + iceberg_arrow_batch_free_func = (void (*)(ArrowBatch*))dlsym(lib_handle, "iceberg_arrow_batch_free"); if (!iceberg_arrow_batch_free_func) { fprintf(stderr, "❌ Failed to resolve iceberg_arrow_batch_free: %s\n", dlerror()); return 0; } - iceberg_error_message_func = (iceberg_error_message_func_t)dlsym(lib_handle, "iceberg_error_message"); - if (!iceberg_error_message_func) { - fprintf(stderr, "❌ Failed to resolve iceberg_error_message: %s\n", dlerror()); + iceberg_destroy_cstring_func = (int (*)(char*))dlsym(lib_handle, "iceberg_destroy_cstring"); + if (!iceberg_destroy_cstring_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_destroy_cstring: %s\n", dlerror()); + return 0; + } + + iceberg_cancel_context_func = (int (*)(const void*))dlsym(lib_handle, "iceberg_cancel_context"); + if (!iceberg_cancel_context_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_cancel_context: %s\n", dlerror()); + return 0; + } + + iceberg_destroy_context_func = (int (*)(const void*))dlsym(lib_handle, "iceberg_destroy_context"); + if (!iceberg_destroy_context_func) { + fprintf(stderr, "❌ Failed to resolve iceberg_destroy_context: %s\n", dlerror()); return 0; } @@ -86,7 +139,7 @@ int load_iceberg_library(const char* library_path) { } // Function to unload the library -void unload_iceberg_library() { +static void unload_iceberg_library(void) { if (lib_handle) { dlclose(lib_handle); lib_handle = NULL; @@ -95,7 +148,7 @@ void unload_iceberg_library() { } int main(int argc, char* argv[]) { - printf("Starting Iceberg C API integration test with dynamic loading...\n"); + printf("Starting Iceberg C API integration test with new async API...\n"); // Check for one command line argument (the path to the library) if (argc < 2) { @@ -103,73 +156,206 @@ int main(int argc, char* argv[]) { return 1; } + // Check if environment variables are set + printf("Environment variables:\n"); + printf(" AWS_ACCESS_KEY_ID: %s\n", getenv("AWS_ACCESS_KEY_ID") ? "SET" : "NOT SET"); + printf(" AWS_SECRET_ACCESS_KEY: %s\n", getenv("AWS_SECRET_ACCESS_KEY") ? "SET" : "NOT SET"); + printf(" AWS_DEFAULT_REGION: %s\n", getenv("AWS_DEFAULT_REGION") ? getenv("AWS_DEFAULT_REGION") : "NOT SET"); + printf(" AWS_ENDPOINT_URL: %s\n", getenv("AWS_ENDPOINT_URL") ? getenv("AWS_ENDPOINT_URL") : "NOT SET"); + + // Load the library if (!load_iceberg_library(argv[1])) { fprintf(stderr, "Failed to load Iceberg library\n"); return 1; } - IcebergTable* table = NULL; - IcebergScan* scan = NULL; + // 1. Initialize the runtime + printf("Initializing Iceberg runtime...\n"); + IcebergStaticConfig config = {0}; // Default config - 0 threads means use default + int result = iceberg_init_runtime_func(config, panic_callback, result_callback); + if (result != CRESULT_OK) { + printf("❌ Failed to initialize runtime\n"); + unload_iceberg_library(); + return 1; + } + printf("✅ Runtime initialized successfully\n"); - // 1. Open table from folder path + // 2. Open table using async API const char* table_path = "s3://warehouse/tpch.sf01/nation"; const char* metadata_path = "metadata/00001-44f668fe-3688-49d5-851f-36e75d143321.metadata.json"; printf("Opening table at: %s\n", table_path); printf("Using metadata file: %s\n", metadata_path); - IcebergResult result = iceberg_table_open_func(table_path, metadata_path, &table); - if (result != ICEBERG_OK) { - printf("❌ Failed to open table: %s\n", iceberg_error_message_func()); + IcebergTableResponse table_response = {0}; + async_completed = 0; // Reset flag + result = iceberg_table_open_func(table_path, metadata_path, &table_response, (const void*)(uintptr_t)&async_completed); + + if (result != CRESULT_OK) { + printf("❌ Failed to initiate table open operation\n"); + unload_iceberg_library(); + return 1; + } + + // Wait for async operation to complete + printf("⏳ Waiting for table open to complete...\n"); + int timeout = 100; // 10 second timeout + while (!async_completed && timeout > 0) { + usleep(100000); // 100ms + timeout--; + } + + if (!async_completed) { + printf("❌ Async operation timed out\n"); + unload_iceberg_library(); + return 1; + } + + // Check if the operation was successful + if (table_response.result != CRESULT_OK) { + printf("❌ Failed to open table (result=%d)", table_response.result); + if (table_response.error_message) { + printf(": %s", table_response.error_message); + iceberg_destroy_cstring_func(table_response.error_message); + } + printf("\n"); + unload_iceberg_library(); + return 1; + } + + if (!table_response.table) { + printf("❌ No table returned from open operation\n"); unload_iceberg_library(); return 1; } + printf("✅ Table opened successfully\n"); - // 2. Create a scan - result = iceberg_table_scan_func(table, &scan); - if (result != ICEBERG_OK) { - printf("❌ Failed to create scan: %s\n", iceberg_error_message_func()); - iceberg_table_free_func(table); + // 3. Create a scan using async API + IcebergScan *scan = iceberg_new_scan_func(table_response.table); + + if (scan == NULL) { + printf("❌ Failed to create scan\n"); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } + + // Print the scan pointer + printf("Scan pointer: %p\n", (void*)scan); + + result = iceberg_scan_build_func(&scan); + + if (result != CRESULT_OK) { + printf("❌ Failed to initiate scan creation\n"); + iceberg_table_free_func(table_response.table); unload_iceberg_library(); return 1; } + + // Print the scan pointer + printf("Scan pointer: %p\n", (void*)scan); + printf("✅ Scan created successfully\n"); - // 3. Optionally select specific columns (commented out since we don't know schema yet) - // const char* columns[] = {"id", "value"}; - // iceberg_scan_select_columns_func(scan, columns, 2); + printf("Step 1: Initializing stream asynchronously...\n"); + IcebergArrowStreamResponse stream_response = {0}; + async_completed = 0; + result = iceberg_arrow_stream_func(scan, &stream_response, (const void*)(uintptr_t)&async_completed); + if (result != CRESULT_OK) { + printf("❌ Failed to create stream\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } + + // Wait for completion + printf("⏳ Waiting for stream creation to complete...\n"); + timeout = 100; // 10 second timeout + while (!async_completed && timeout > 0) { + usleep(100000); // 100ms + timeout--; + } + + if (!async_completed) { + printf("❌ Stream creation async operation timed out\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } + + // Check if operation was successful + if (stream_response.result != CRESULT_OK) { + printf("❌ Failed to create stream"); + if (stream_response.error_message) { + printf(": %s", stream_response.error_message); + iceberg_destroy_cstring_func(stream_response.error_message); + } + printf("\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } - // 4. Iterate through Arrow batches as serialized bytes - int batch_count = 0; - size_t total_bytes = 0; + if (!stream_response.stream) { + printf("❌ No stream returned from stream creation\n"); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } - while (true) { - ArrowBatch* batch = NULL; + printf("✅ Stream created successfully\n"); - result = iceberg_scan_next_batch_func(scan, &batch); + // 4. Try to get a batch using new two-step async API + printf("Step 2: Getting first batch from stream asynchronously...\n"); + IcebergBatchResponse batch_response = {0}; + async_completed = 0; // Reset flag + result = iceberg_next_batch_func(stream_response.stream, &batch_response, (const void*)(uintptr_t)&async_completed); - if (result == ICEBERG_END_OF_STREAM) { - printf("✅ Reached end of stream\n"); - break; + if (result == CRESULT_OK) { + // Wait for batch retrieval to complete + timeout = 100; // 10 second timeout + while (!async_completed && timeout > 0) { + usleep(100000); // 100ms + timeout--; } - if (result != ICEBERG_OK) { - printf("❌ Failed to get next batch: %s\n", iceberg_error_message_func()); - break; + if (!async_completed) { + printf("❌ Batch retrieval async operation timed out\n"); + iceberg_arrow_stream_free_func(stream_response.stream); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; } + } - if (batch == NULL) { - printf("❌ Received NULL batch\n"); - break; + if (result != CRESULT_OK) { + printf("❌ Failed to get first batch from stream\n"); + if (batch_response.error_message) { + printf(" Error: %s\n", batch_response.error_message); + iceberg_destroy_cstring_func(batch_response.error_message); } + iceberg_arrow_stream_free_func(stream_response.stream); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); + unload_iceberg_library(); + return 1; + } - batch_count++; - total_bytes += batch->length; + printf("Step 3: Checking batch result...\n"); - printf("📦 Batch %d:\n", batch_count); + ArrowBatch* batch = batch_response.batch; + + if (batch) { + printf("✅ Successfully retrieved batch!\n"); + printf("📦 Batch details:\n"); printf(" - Serialized size: %zu bytes\n", batch->length); - printf(" - Data pointer: %p\n", (void*)batch->data); + printf(" - Data pointer: %p\n", (const void*)batch->data); printf(" - First few bytes: "); // Print first 8 bytes as hex for verification @@ -178,27 +364,47 @@ int main(int argc, char* argv[]) { printf("%02x ", batch->data[i]); } printf("\n"); - - // This is where you would pass the serialized Arrow data to Julia - // In Julia, you would: - // 1. Create an IOBuffer from the bytes: IOBuffer(unsafe_wrap(Array, batch->data, batch->length)) - // 2. Use Arrow.jl to read: Arrow.Stream(io_buffer) printf(" → Arrow IPC bytes ready for Julia Arrow.Stream()\n"); - // Free the batch (this calls back to Rust to free memory) + // Free the batch directly iceberg_arrow_batch_free_func(batch); + } else { + printf("✅ Reached end of stream (no more batches)\n"); } - printf("📊 Summary:\n"); - printf(" - Total batches: %d\n", batch_count); - printf(" - Total bytes processed: %zu\n", total_bytes); + // 4. Test context cancellation functions + printf("Testing context cancellation functions...\n"); + + // Test that cancellation functions can be called with valid context pointers + if (table_response.context != NULL) { + printf(" - Testing cancel_context with table context...\n"); + int cancel_result = iceberg_cancel_context_func(table_response.context); + if (cancel_result == 0) { + printf(" ✅ cancel_context succeeded\n"); + } else { + printf(" ⚠️ cancel_context returned: %d\n", cancel_result); + } + + printf(" - Testing destroy_context with table context...\n"); + int destroy_result = iceberg_destroy_context_func(table_response.context); + if (destroy_result == 0) { + printf(" ✅ destroy_context succeeded\n"); + } else { + printf(" ⚠️ destroy_context returned: %d\n", destroy_result); + } + table_response.context = NULL; // Mark as cleaned up + } + + printf("✅ Context cancellation functions tested successfully\n"); // 5. Cleanup - iceberg_scan_free_func(scan); - iceberg_table_free_func(table); + printf("Cleaning up resources...\n"); + iceberg_arrow_stream_free_func(stream_response.stream); + iceberg_scan_free_func(&scan); + iceberg_table_free_func(table_response.table); unload_iceberg_library(); printf("✅ Integration test completed successfully!\n"); - printf("🚀 Ready for Julia bindings integration\n"); + printf("🚀 New async API is working correctly\n"); return 0; -} \ No newline at end of file +}