Skip to content

Commit 5317484

Browse files
committed
Add support for *_proxy env vars
This includes `http_proxy`, `https_proxy` and `all_proxy` semi-standard env vars. This supports both capital and lower-case env vars because these variables are confusing to everyone. Curl may have introduced them, and they use lower-case for `http_proxy` and screaming-snake case for all others[1]. Stack Exchange answers around these questions have multiple conflicting recommendations (although this one is good[2]), and I've personally seen folks think that only one or the other (lower, upper) or mixed-use are required for all projects. Since folks are going to be coming to us with some previous experience and confusion around these, just supporting both seems like the thing that will cause the fewest number of support requests. Some downsides of this commit: * It unfortunately re-introduces time v0.1 into our dependecy tree, but I have a PR up to remove it. * PubNub sources don't respect the headers, the client library hard-codes their hyper::Client type parameters, and `hyper::Client<impl Connect + Clone>` in general is hard to type-erase. [1]: https://curl.se/docs/manual.html#environment-variables [2]: https://superuser.com/a/1166790
1 parent 776c776 commit 5317484

File tree

19 files changed

+263
-41
lines changed

19 files changed

+263
-41
lines changed

Cargo.lock

Lines changed: 89 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ members = [
1313
"src/dataflow-types",
1414
"src/dataflow",
1515
"src/expr",
16+
"src/http-util",
1617
"src/interchange",
1718
"src/kafka-util",
1819
"src/materialized",

deny.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ skip = [
2424

2525
# Waiting on parse_duration.
2626
{ name = "num-bigint", version = "0.2.6" },
27+
28+
# waiting on https://github.com/hyperium/headers/pull/83
29+
{ name = "time", version = "0.1.44" },
30+
# this is already merged into tokio-postgres, just waiting for a release
31+
{ name = "socket2", version = "0.3.19" },
2732
]
2833
deny = [
2934
# Strum has suspect code quality and includes many unneeded features. Use

src/aws-util/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ publish = false
88
[dependencies]
99
anyhow = "1.0.40"
1010
log = "0.4.13"
11+
http-util = { path = "../http-util" }
1112
rusoto_core = { git = "https://github.com/rusoto/rusoto.git" }
1213
rusoto_credential = { git = "https://github.com/rusoto/rusoto.git" }
1314
rusoto_kinesis = { git = "https://github.com/rusoto/rusoto.git" }
1415
rusoto_s3 = { git = "https://github.com/rusoto/rusoto.git" }
15-
rusoto_sts = { git = "https://github.com/rusoto/rusoto.git" }
1616
rusoto_sqs = { git = "https://github.com/rusoto/rusoto.git" }
17+
rusoto_sts = { git = "https://github.com/rusoto/rusoto.git" }
1718
serde = { version = "1.0.124", features = ["derive"] }
1819
tokio = "1.4.0"

src/aws-util/src/aws.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
//! Utility functions for AWS.
1111
1212
use anyhow::{anyhow, Context};
13-
use rusoto_core::{HttpClient, Region};
13+
use rusoto_core::Region;
1414
use rusoto_credential::{
1515
AutoRefreshingProvider, AwsCredentials, ChainProvider, ProvideAwsCredentials, StaticProvider,
1616
};
@@ -81,7 +81,8 @@ pub async fn account(
8181
region: Region,
8282
timeout: Duration,
8383
) -> Result<String, anyhow::Error> {
84-
let dispatcher = HttpClient::new().context("creating HTTP for AWS STS Account verification")?;
84+
let dispatcher =
85+
crate::client::http().context("creating HTTP client for AWS STS Account verification")?;
8586
let sts_client = StsClient::new_with(dispatcher, provider, region);
8687
let get_identity = sts_client.get_caller_identity(GetCallerIdentityRequest {});
8788
let account = time::timeout(timeout, get_identity)

src/aws-util/src/client.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
1515
use std::time::Duration;
1616

17-
use anyhow::Context;
17+
use anyhow::{anyhow, Context};
1818
use log::info;
1919
use rusoto_core::HttpClient;
2020
use rusoto_credential::{AutoRefreshingProvider, AwsCredentials, ChainProvider, StaticProvider};
@@ -24,6 +24,13 @@ use rusoto_sqs::SqsClient;
2424

2525
use crate::aws::ConnectInfo;
2626

27+
/// Get an [`HttpClient`] that respects the `http_proxy` environment variables
28+
pub(crate) fn http() -> Result<HttpClient<http_util::ProxiedConnector>, anyhow::Error> {
29+
Ok(HttpClient::from_connector(
30+
http_util::connector().map_err(|e| anyhow!(e))?,
31+
))
32+
}
33+
2734
/// Create a function that calls <Client>::new() with credentials sources
2835
///
2936
/// Unfortunately there is no trait that we can rely on in Rusoto to allow us
@@ -51,22 +58,24 @@ The [`AutoRefreshingProvider`] caches the underlying provider's AWS credentials,
5158
automatically fetching updated credentials if they've expired.
5259
"]
5360
pub async fn $name(conn_info: ConnectInfo) -> Result<$client, anyhow::Error> {
54-
let request_dispatcher = HttpClient::new().context("creating HTTP client for S3 client")?;
61+
let request_dispatcher = http().context(
62+
concat!("creating HTTP client for ", $client_name))?;
5563
let the_client = if let Some(creds) = conn_info.credentials {
56-
info!(concat!("Creating a new", stringify!(client), " from provided access_key and secret_access_key"));
64+
info!(concat!("Creating a new", $client_name, " from provided access_key and secret_access_key"));
5765
let provider = StaticProvider::from(AwsCredentials::from(creds));
5866

5967
$client::new_with(request_dispatcher, provider, conn_info.region)
6068
} else {
6169
info!(
6270
concat!("AWS access_key_id and secret_access_key not provided, \
63-
creating a new ", stringify!($client), " using a chain provider.")
71+
creating a new ", $client_name, " using a chain provider.")
6472
);
6573
let mut provider = ChainProvider::new();
6674

6775
provider.set_timeout(Duration::from_secs(10));
6876
let provider =
69-
AutoRefreshingProvider::new(provider).context(concat!("generating AWS credentials refreshing provider for ", stringify!($client)))?;
77+
AutoRefreshingProvider::new(provider).context(
78+
concat!("generating AWS credentials refreshing provider for ", $client_name))?;
7079

7180
$client::new_with(request_dispatcher, provider, conn_info.region)
7281
};

src/ccsr/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ edition = "2018"
66
publish = false
77

88
[dependencies]
9+
anyhow = "1.0.40"
910
futures = "0.3.13"
11+
http-util = { path = "../http-util" }
1012
native-tls = "0.2.7"
1113
openssl = { version = "0.10.33", features = ["vendored"] }
1214
reqwest = { version = "0.11.2", features = ["blocking", "json", "native-tls-vendored"] }
@@ -15,7 +17,7 @@ serde_json = "1.0.64"
1517
url = { version = "2.2.1", features = ["serde"] }
1618

1719
[dev-dependencies]
18-
anyhow = "1.0.40"
19-
hyper = "0.14.0"
20+
tracing = { version = "0.1.0", features = ["log"] }
21+
hyper = { version = "0.14.5", features = ["server"] }
2022
lazy_static = "1.4.0"
2123
tokio = { version = "1.4.0", features = ["macros"] }

src/ccsr/src/config.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10+
use anyhow::Context;
1011
use reqwest::Url;
1112
use serde::{Deserialize, Serialize};
1213

@@ -62,8 +63,9 @@ impl ClientConfig {
6263
}
6364

6465
/// Builds the [`Client`].
65-
pub fn build(self) -> Client {
66-
let mut builder = reqwest::Client::builder();
66+
pub fn build(self) -> Result<Client, anyhow::Error> {
67+
let mut builder = http_util::reqwest_client_builder()
68+
.context("Creating HTTP client for schema registry")?;
6769

6870
for root_cert in self.root_certs {
6971
builder = builder.add_root_certificate(root_cert.into());
@@ -78,6 +80,6 @@ impl ClientConfig {
7880
.build()
7981
.unwrap();
8082

81-
Client::new(inner, self.url, self.auth)
83+
Ok(Client::new(inner, self.url, self.auth))
8284
}
8385
}

src/ccsr/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
//! use ccsr::ClientConfig;
2424
//!
2525
//! let url = "http://localhost:8080".parse()?;
26-
//! let client = ClientConfig::new(url).build();
26+
//! let client = ClientConfig::new(url).build()?;
2727
//! let subjects = client.list_subjects().await?;
2828
//! for subject in subjects {
2929
//! let schema = client.get_schema_by_subject(&subject).await?;

0 commit comments

Comments
 (0)