Skip to content

Commit 73e7a4e

Browse files
benmannsdigikata
andauthored
Pass headers to websocket requests (#244)
Pass headers to websocket requests --------- Co-authored-by: Alan Chen <[email protected]>
1 parent fb82717 commit 73e7a4e

File tree

6 files changed

+99
-7
lines changed

6 files changed

+99
-7
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/http-source/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ humantime-serde = { version = "1.1", default-features = false }
2424
tokio-stream = { version = "0.1", default-features = false, features = ["time"] }
2525
tokio = { version = "1.40", default-features = false, features = ["time"] }
2626
tokio-tungstenite = { version = "0.21.0", features = [ "rustls-tls-webpki-roots" ] }
27-
# tungstenite = { version = "0.21.0", features = ["rustls-tls-webpki-roots"] }
2827
encoding_rs = { version = "0.8", default-features = false }
2928
mime = { version = "0.3", default-features = false }
29+
http = "1.1"
3030

3131
fluvio = { git = "https://github.com/infinyon/fluvio", branch = "wasmtime_21" }
3232
fluvio-connector-common = { git = "https://github.com/infinyon/fluvio", branch = "wasmtime_21", features = ["derive"] }

crates/http-source/src/websocket_source.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use tokio::net::TcpStream;
1414
use tokio::time::Duration;
1515
use tokio_stream::{wrappers::IntervalStream, StreamExt};
1616
use tokio_tungstenite::{
17-
connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
17+
connect_async,
18+
tungstenite::{client::IntoClientRequest, protocol::Message},
19+
MaybeTlsStream, WebSocketStream,
1820
};
1921
use url::Url;
2022

@@ -27,7 +29,7 @@ pub(crate) struct WebSocketSource {
2729

2830
#[derive(Clone)]
2931
struct WSRequest {
30-
url: Url,
32+
request: tokio_tungstenite::tungstenite::handshake::client::Request,
3133
subscription_message: Option<String>,
3234
}
3335

@@ -54,9 +56,9 @@ impl PingStream for WSPingOnlySink {
5456
}
5557

5658
async fn establish_connection(request: WSRequest) -> Result<WebSocketStream<Transport>> {
57-
match connect_async(&request.url).await {
59+
match connect_async(request.request.clone()).await {
5860
Ok((mut ws_stream, _)) => {
59-
info!("WebSocket connected to {}", &request.url);
61+
info!("WebSocket connected to {}", &request.request.uri());
6062
if let Some(message) = request.subscription_message.as_ref() {
6163
ws_stream.send(Message::Text(message.to_owned())).await?;
6264
}
@@ -128,10 +130,31 @@ async fn websocket_writer_and_stream<'a>(
128130
impl WebSocketSource {
129131
pub(crate) fn new(config: &HttpConfig) -> Result<Self> {
130132
let ws_config = config.websocket_config.as_ref();
133+
134+
let mut request = Url::parse(&config.endpoint.resolve()?)?.into_client_request()?;
135+
let headers = request.headers_mut();
136+
137+
for h in config.headers.iter() {
138+
match h.resolve() {
139+
Ok(h) => {
140+
if let Some((key, value)) = h.split_once(':') {
141+
headers.insert(
142+
http::HeaderName::from_bytes(key.as_bytes())?,
143+
value.parse()?,
144+
);
145+
} else {
146+
error!("Failed to split header");
147+
}
148+
}
149+
Err(e) => {
150+
error!("Failed to resolve header: {}", e);
151+
}
152+
}
153+
}
154+
131155
Ok(Self {
132156
request: WSRequest {
133-
url: Url::parse(&config.endpoint.resolve()?)
134-
.context("unable to parse http endpoint")?,
157+
request,
135158
subscription_message: ws_config.and_then(|c| c.subscription_message.to_owned()),
136159
},
137160
ping_interval_ms: ws_config.and_then(|c| c.ping_interval_ms).unwrap_or(10_000),

crates/mock-http-server/src/main.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,21 @@ async fn main() -> tide::Result<()> {
3838
}
3939
Ok(())
4040
}));
41+
app.at("/websocket-auth")
42+
.get(WebSocket::new(|request, stream| async move {
43+
let header_values = request.header("x-secret-token");
44+
if header_values.is_none() || header_values.unwrap().last() != "abc123" {
45+
stream.send_string("Unauthorized".to_string()).await?;
46+
return Ok(());
47+
}
48+
49+
for i in 1..11 {
50+
stream
51+
.send_string(format!("Hello, Fluvio! - {}", i))
52+
.await?;
53+
}
54+
Ok(())
55+
}));
4156

4257
app.listen("127.0.0.1:8080").await?;
4358
Ok(())
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
meta:
2+
version: latest
3+
name: websocket-headers-connector
4+
type: websocket-source
5+
topic: TOPIC
6+
create_topic: false
7+
producer:
8+
linger: 0ms
9+
http:
10+
endpoint: ws://127.0.0.1:8080/websocket-auth
11+
headers:
12+
- "x-secret-token: abc123"

tests/websocket-headers-test.bats

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#!/usr/bin/env bats
2+
3+
setup() {
4+
cargo build -p mock-http-server
5+
./target/debug/mock-http-server & disown
6+
MOCK_PID=$!
7+
FILE=$(mktemp)
8+
cp ./tests/websocket-headers-test-config.yaml $FILE
9+
UUID=$(uuidgen | awk '{print tolower($0)}')
10+
TOPIC=${UUID}-topic
11+
fluvio topic create $TOPIC
12+
13+
sed -i.BAK "s/TOPIC/${TOPIC}/g" $FILE
14+
cat $FILE
15+
16+
cargo build -p http-source
17+
./target/debug/http-source --config $FILE & disown
18+
CONNECTOR_PID=$!
19+
}
20+
21+
teardown() {
22+
fluvio topic delete $TOPIC
23+
kill $MOCK_PID
24+
kill $CONNECTOR_PID
25+
}
26+
27+
@test "websocket-connector-test" {
28+
count=1
29+
echo "Starting consumer on topic $TOPIC"
30+
sleep 13
31+
32+
fluvio consume -B -d $TOPIC | while read input; do
33+
expected="Hello, Fluvio! - $count"
34+
echo $input = $expected
35+
[ "$input" = "$expected" ]
36+
count=$(($count + 1))
37+
if [ $count -eq 10 ]; then
38+
break;
39+
fi
40+
done
41+
}

0 commit comments

Comments
 (0)