Skip to content

Commit b288002

Browse files
authored
Add builder API and remove unnecessary Arcs (#25)
1 parent 51a510a commit b288002

File tree

5 files changed

+109
-53
lines changed

5 files changed

+109
-53
lines changed

.github/workflows/rust-test.yml

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ jobs:
4141

4242
- uses: Swatinem/rust-cache@v2
4343

44-
- name: Install native libs
45-
run: sudo apt-get install -y libkrb5-dev
46-
4744
- name: build and lint with clippy
4845
run: cargo clippy --all-targets --features integration-test -- -D warnings
4946

@@ -84,14 +81,5 @@ jobs:
8481
distribution: "temurin"
8582
java-version: "17"
8683

87-
- name: Install native libs
88-
run: sudo apt-get install -y libkrb5-dev krb5-user
89-
90-
- name: Download Hadoop
91-
run: |
92-
wget -q https://dlcdn.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0.tar.gz
93-
tar -xf hadoop-3.4.0.tar.gz -C $GITHUB_WORKSPACE
94-
echo "$GITHUB_WORKSPACE/hadoop-3.4.0/bin" >> $GITHUB_PATH
95-
9684
- name: Run tests
9785
run: cargo test --features integration-test

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ async-trait = "0.1"
1515
bytes = "1"
1616
chrono = "0.4"
1717
futures = "0.3"
18-
hdfs-native = "0.12"
18+
hdfs-native = "0.12.2"
1919
object_store = "0.12.2"
2020
thiserror = "2"
2121
tokio = { version = "1", features = ["rt", "net", "io-util", "macros", "sync", "time"] }

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ Each release supports a certain minor release of both the `object_store` crate a
1212
|0.12.x|>=0.10, <0.12|0.10|
1313
|0.13.x|>=0.10, <0.12|0.11|
1414
|0.14.x|0.12|0.11|
15-
|0.15.x|0.12|0.12|
15+
|0.15.x|>=0.12.2, <0.13|0.12|
1616

1717
# Usage
1818
```rust
19-
use hdfs_native_object_store::HdfsObjectStore;
20-
let store = HdfsObjectStore::with_url("hdfs://localhost:9000")?;
19+
use hdfs_native_object_store::HdfsObjectStoreBuilder;
20+
let store = HdfsObjectStoreBuilder::new().with_url("hdfs://localhost:9000").build()?;
2121
```
2222

2323
# Documentation

src/lib.rs

Lines changed: 103 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,18 @@
33
//! # Usage
44
//!
55
//! ```rust
6-
//! use hdfs_native_object_store::HdfsObjectStore;
7-
//! # use object_store::Result;
8-
//! # fn main() -> Result<()> {
9-
//! let store = HdfsObjectStore::with_url("hdfs://localhost:9000")?;
10-
//! # Ok(())
11-
//! # }
6+
//! use hdfs_native_object_store::HdfsObjectStoreBuilder;
7+
//! let store = HdfsObjectStoreBuilder::new()
8+
//! .with_url("hdfs://localhost:9000")
9+
//! .build()
10+
//! .unwrap();
1211
//! ```
1312
//!
1413
use std::{
1514
collections::HashMap,
1615
fmt::{Display, Formatter},
1716
future,
1817
path::PathBuf,
19-
sync::Arc,
2018
};
2119

2220
use async_trait::async_trait;
@@ -34,6 +32,7 @@ use object_store::{
3432
ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
3533
};
3634
use tokio::{
35+
runtime::Handle,
3736
sync::{mpsc, oneshot},
3837
task::{self, JoinHandle},
3938
};
@@ -51,23 +50,64 @@ fn generic_error(
5150
}
5251
}
5352

53+
/// Builder for creating an [HdfsObjectStore]
54+
#[derive(Default)]
55+
pub struct HdfsObjectStoreBuilder {
56+
inner: ClientBuilder,
57+
}
58+
59+
impl HdfsObjectStoreBuilder {
60+
/// Create a new [HdfsObjectStoreBuilder]
61+
pub fn new() -> Self {
62+
Self::default()
63+
}
64+
65+
/// Set the URL to connect to. Can be the address of a single NameNode, or a logical NameService
66+
pub fn with_url(mut self, url: impl Into<String>) -> Self {
67+
self.inner = self.inner.with_url(url);
68+
self
69+
}
70+
71+
/// Set configs to use for the client. The provided configs will override any found in the default config files loaded
72+
pub fn with_config(
73+
mut self,
74+
config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
75+
) -> Self {
76+
self.inner = self.inner.with_config(config);
77+
self
78+
}
79+
80+
// Use a dedicated tokio runtime for spawned tasks and IO operations
81+
pub fn with_io_runtime(mut self, runtime: Handle) -> Self {
82+
self.inner = self.inner.with_io_runtime(runtime);
83+
self
84+
}
85+
86+
/// Create the [HdfsObjectStore]] instance from the provided settings
87+
pub fn build(self) -> Result<HdfsObjectStore> {
88+
let client = self.inner.build().to_object_store_err()?;
89+
90+
Ok(HdfsObjectStore { client })
91+
}
92+
}
93+
5494
/// Interface for [Hadoop Distributed File System](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html).
5595
#[derive(Debug, Clone)]
5696
pub struct HdfsObjectStore {
57-
client: Arc<Client>,
97+
client: Client,
5898
}
5999

60100
impl HdfsObjectStore {
61-
/// Creates a new HdfsObjectStore from an existing [Client]
101+
/// Creates a new HdfsObjectStore from an existing `hdfs-native` [Client]
62102
///
63103
/// ```rust
64104
/// # use std::sync::Arc;
65105
/// use hdfs_native::ClientBuilder;
66106
/// # use hdfs_native_object_store::HdfsObjectStore;
67107
/// let client = ClientBuilder::new().with_url("hdfs://127.0.0.1:9000").build().unwrap();
68-
/// let store = HdfsObjectStore::new(Arc::new(client));
108+
/// let store = HdfsObjectStore::new(client);
69109
/// ```
70-
pub fn new(client: Arc<Client>) -> Self {
110+
pub fn new(client: Client) -> Self {
71111
Self { client }
72112
}
73113

@@ -81,13 +121,14 @@ impl HdfsObjectStore {
81121
/// # Ok(())
82122
/// # }
83123
/// ```
124+
#[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
84125
pub fn with_url(url: &str) -> Result<Self> {
85-
Ok(Self::new(Arc::new(
86-
ClientBuilder::new()
87-
.with_url(url)
88-
.build()
89-
.to_object_store_err()?,
90-
)))
126+
let client = ClientBuilder::new()
127+
.with_url(url)
128+
.build()
129+
.to_object_store_err()?;
130+
131+
Ok(Self { client })
91132
}
92133

93134
/// Creates a new HdfsObjectStore using the specified URL and Hadoop configs.
@@ -106,14 +147,15 @@ impl HdfsObjectStore {
106147
/// # Ok(())
107148
/// # }
108149
/// ```
150+
#[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
109151
pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
110-
Ok(Self::new(Arc::new(
111-
ClientBuilder::new()
112-
.with_url(url)
113-
.with_config(config)
114-
.build()
115-
.to_object_store_err()?,
116-
)))
152+
let client = ClientBuilder::new()
153+
.with_url(url)
154+
.with_config(config)
155+
.build()
156+
.to_object_store_err()?;
157+
158+
Ok(Self { client })
117159
}
118160

119161
async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
@@ -190,7 +232,7 @@ impl Display for HdfsObjectStore {
190232

191233
impl From<Client> for HdfsObjectStore {
192234
fn from(value: Client) -> Self {
193-
Self::new(Arc::new(value))
235+
Self { client: value }
194236
}
195237
}
196238

@@ -257,7 +299,7 @@ impl ObjectStore for HdfsObjectStore {
257299
let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
258300

259301
Ok(Box::new(HdfsMultipartWriter::new(
260-
Arc::clone(&self.client),
302+
self.client.clone(),
261303
tmp_file,
262304
&tmp_file_path,
263305
&final_file_path,
@@ -491,19 +533,14 @@ type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload
491533
// A once cell is used to track whether a part has finished writing or not.
492534
// On completing, rename the file to the actual target.
493535
struct HdfsMultipartWriter {
494-
client: Arc<Client>,
536+
client: Client,
495537
sender: Option<(JoinHandle<Result<()>>, PartSender)>,
496538
tmp_filename: String,
497539
final_filename: String,
498540
}
499541

500542
impl HdfsMultipartWriter {
501-
fn new(
502-
client: Arc<Client>,
503-
writer: FileWriter,
504-
tmp_filename: &str,
505-
final_filename: &str,
506-
) -> Self {
543+
fn new(client: Client, writer: FileWriter, tmp_filename: &str, final_filename: &str) -> Self {
507544
let (sender, receiver) = mpsc::unbounded_channel();
508545

509546
Self {
@@ -659,16 +696,22 @@ mod test {
659696
use std::collections::HashSet;
660697

661698
use object_store::integration::*;
699+
use serial_test::serial;
700+
use tokio::runtime::Runtime;
662701

663-
use crate::HdfsObjectStore;
702+
use crate::HdfsObjectStoreBuilder;
664703

665704
#[tokio::test]
705+
#[serial]
666706
async fn hdfs_test() {
667707
let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
668708
hdfs_native::minidfs::DfsFeatures::HA,
669709
]));
670710

671-
let integration = HdfsObjectStore::with_url(&dfs.url).unwrap();
711+
let integration = HdfsObjectStoreBuilder::new()
712+
.with_url(&dfs.url)
713+
.build()
714+
.unwrap();
672715

673716
put_get_delete_list(&integration).await;
674717
list_uses_directories_correctly(&integration).await;
@@ -680,4 +723,29 @@ mod test {
680723
get_opts(&integration).await;
681724
put_opts(&integration, false).await;
682725
}
726+
727+
#[test]
728+
#[serial]
729+
fn test_no_tokio() {
730+
let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
731+
hdfs_native::minidfs::DfsFeatures::HA,
732+
]));
733+
734+
let integration = HdfsObjectStoreBuilder::new()
735+
.with_url(&dfs.url)
736+
.build()
737+
.unwrap();
738+
739+
futures::executor::block_on(get_opts(&integration));
740+
741+
let rt = Runtime::new().unwrap();
742+
743+
let integration = HdfsObjectStoreBuilder::new()
744+
.with_url(&dfs.url)
745+
.with_io_runtime(rt.handle().clone())
746+
.build()
747+
.unwrap();
748+
749+
futures::executor::block_on(get_opts(&integration));
750+
}
683751
}

0 commit comments

Comments
 (0)