Skip to content

Commit

Permalink
feat(services/webdfs): Add user.name support for webhdfs
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Jan 22, 2025
1 parent cca3a87 commit 81af05b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 4 deletions.
50 changes: 46 additions & 4 deletions core/src/services/webhdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ impl WebhdfsBuilder {
self
}

/// Set the username of this backend,
/// used for authentication
///
pub fn username(mut self, username: &str) -> Self {
if !username.is_empty() {
self.config.user_name = Some(username.to_string());
}
self
}

/// Set the delegation token of this backend,
/// used for authentication
///
Expand Down Expand Up @@ -179,6 +189,7 @@ impl Builder for WebhdfsBuilder {
let backend = WebhdfsBackend {
root,
endpoint,
user_name: self.config.user_name,
auth,
client,
root_checker: OnceCell::new(),
Expand All @@ -195,6 +206,7 @@ impl Builder for WebhdfsBuilder {
pub struct WebhdfsBackend {
root: String,
endpoint: String,
user_name: Option<String>,
auth: Option<String>,
root_checker: OnceCell<()>,

Expand All @@ -212,6 +224,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand All @@ -220,6 +235,7 @@ impl WebhdfsBackend {

req.body(Buffer::new()).map_err(new_request_build_error)
}

/// create object
pub async fn webhdfs_create_object_request(
&self,
Expand All @@ -235,6 +251,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand Down Expand Up @@ -277,6 +296,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
Expand Down Expand Up @@ -311,7 +333,9 @@ impl WebhdfsBackend {
percent_encode_path(&from),
percent_encode_path(&to)
);

if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
Expand All @@ -330,7 +354,9 @@ impl WebhdfsBackend {
body: Buffer,
) -> Result<Request<Buffer>> {
let mut url = location.to_string();

if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
Expand Down Expand Up @@ -362,7 +388,9 @@ impl WebhdfsBackend {
percent_encode_path(&p),
percent_encode_path(&sources),
);

if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
Expand All @@ -379,6 +407,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
Expand All @@ -404,6 +435,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand All @@ -429,6 +463,9 @@ impl WebhdfsBackend {
if !start_after.is_empty() {
url += format!("&startAfter={}", start_after).as_str();
}
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand All @@ -455,7 +492,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);

if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand All @@ -474,6 +513,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/services/webhdfs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub struct WebhdfsConfig {
pub root: Option<String>,
/// Endpoint for webhdfs.
pub endpoint: Option<String>,
/// Name of the user for webhdfs.
pub user_name: Option<String>,
/// Delegation token for webhdfs.
pub delegation: Option<String>,
/// Disable batch listing
Expand All @@ -43,6 +45,7 @@ impl Debug for WebhdfsConfig {
f.debug_struct("WebhdfsConfig")
.field("root", &self.root)
.field("endpoint", &self.endpoint)
.field("user_name", &self.user_name)
.field("atomic_write_dir", &self.atomic_write_dir)
.finish_non_exhaustive()
}
Expand Down

0 comments on commit 81af05b

Please sign in to comment.