From 35f506767bd687b8c2076deadbf44a59e8e07109 Mon Sep 17 00:00:00 2001
From: Weston Carlson <wez470@gmail.com>
Date: Tue, 19 Jul 2022 23:22:38 +0000
Subject: [PATCH 1/7] Add optional auth to websocket connections.

---
 src/sinks/websocket/config.rs |  5 ++++-
 src/sinks/websocket/sink.rs   | 15 +++++++++++++--
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/src/sinks/websocket/config.rs b/src/sinks/websocket/config.rs
index e6ebed2ef9f9d..ceff49baf2ef3 100644
--- a/src/sinks/websocket/config.rs
+++ b/src/sinks/websocket/config.rs
@@ -5,6 +5,7 @@ use snafu::ResultExt;
 use crate::{
     codecs::EncodingConfig,
     config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
+    http::Auth,
     sinks::{
         websocket::sink::{ConnectSnafu, WebSocketConnector, WebSocketError, WebSocketSink},
         Healthcheck, VectorSink,
@@ -25,6 +26,7 @@ pub struct WebSocketSinkConfig {
         skip_serializing_if = "crate::serde::skip_serializing_if_default"
     )]
     pub acknowledgements: AcknowledgementsConfig,
+    pub auth: Option<Auth>,
 }
 
 impl GenerateConfig for WebSocketSinkConfig {
@@ -36,6 +38,7 @@ impl GenerateConfig for WebSocketSinkConfig {
             ping_interval: None,
             ping_timeout: None,
             acknowledgements: Default::default(),
+            auth: None,
         })
         .unwrap()
     }
@@ -70,7 +73,7 @@ impl SinkConfig for WebSocketSinkConfig {
 impl WebSocketSinkConfig {
     fn build_connector(&self) -> Result<WebSocketConnector, WebSocketError> {
         let tls = MaybeTlsSettings::from_config(&self.tls, false).context(ConnectSnafu)?;
-        WebSocketConnector::new(self.uri.clone(), tls)
+        WebSocketConnector::new(self.uri.clone(), tls, self.auth.clone())
     }
 }
 
diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs
index 0a952aad8e25d..8457c7804162f 100644
--- a/src/sinks/websocket/sink.rs
+++ b/src/sinks/websocket/sink.rs
@@ -38,6 +38,7 @@ use crate::{
     codecs::{Encoder, Transformer},
     dns, emit,
     event::{Event, EventStatus, Finalizable},
+    http::Auth,
     internal_events::{
         ConnectionOpen, OpenGauge, WsConnectionError, WsConnectionEstablished,
         WsConnectionFailedError, WsConnectionShutdown,
@@ -66,10 +67,11 @@ pub struct WebSocketConnector {
     host: String,
     port: u16,
     tls: MaybeTlsSettings,
+    auth: Option<Auth>,
 }
 
 impl WebSocketConnector {
-    pub fn new(uri: String, tls: MaybeTlsSettings) -> Result<Self, WebSocketError> {
+    pub fn new(uri: String, tls: MaybeTlsSettings, auth: Option<Auth>) -> Result<Self, WebSocketError> {
         let request = (&uri).into_client_request().context(CreateFailedSnafu)?;
         let (host, port) = Self::extract_host_and_port(&request).context(CreateFailedSnafu)?;
 
@@ -78,6 +80,7 @@ impl WebSocketConnector {
             host,
             port,
             tls,
+            auth,
         })
     }
 
@@ -118,9 +121,14 @@ impl WebSocketConnector {
     }
 
     async fn connect(&self) -> Result<WsStream<MaybeTlsStream<TcpStream>>, WebSocketError> {
-        let request = (&self.uri)
+        let mut request = (&self.uri)
             .into_client_request()
             .context(CreateFailedSnafu)?;
+
+        if let Some(auth) = &self.auth {
+            auth.apply(&mut request);
+        }
+
         let maybe_tls = self.tls_connect().await?;
 
         let ws_config = WebSocketConfig {
@@ -398,6 +406,7 @@ mod tests {
             ping_interval: None,
             ping_timeout: None,
             acknowledgements: Default::default(),
+            auth: None,
         };
         let tls = MaybeTlsSettings::Raw(());
 
@@ -427,6 +436,7 @@ mod tests {
             ping_timeout: None,
             ping_interval: None,
             acknowledgements: Default::default(),
+            auth: Some(Auth::Bearer{token: "OiJIUzI1NiIsInR5cCI6IkpXVCJ".to_string()}),
         };
 
         send_events_and_assert(addr, config, tls).await;
@@ -444,6 +454,7 @@ mod tests {
             ping_interval: None,
             ping_timeout: None,
             acknowledgements: Default::default(),
+            auth: None
         };
         let tls = MaybeTlsSettings::Raw(());
 

From e75c596b9d7ca46a26e77233431babf732f97a7b Mon Sep 17 00:00:00 2001
From: Weston Carlson <wez470@gmail.com>
Date: Wed, 20 Jul 2022 14:48:56 +0000
Subject: [PATCH 2/7] Format code.

---
 src/sinks/websocket/sink.rs | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs
index 8457c7804162f..6c63eb74ecda9 100644
--- a/src/sinks/websocket/sink.rs
+++ b/src/sinks/websocket/sink.rs
@@ -71,7 +71,11 @@ pub struct WebSocketConnector {
 }
 
 impl WebSocketConnector {
-    pub fn new(uri: String, tls: MaybeTlsSettings, auth: Option<Auth>) -> Result<Self, WebSocketError> {
+    pub fn new(
+        uri: String,
+        tls: MaybeTlsSettings,
+        auth: Option<Auth>,
+    ) -> Result<Self, WebSocketError> {
         let request = (&uri).into_client_request().context(CreateFailedSnafu)?;
         let (host, port) = Self::extract_host_and_port(&request).context(CreateFailedSnafu)?;
 
@@ -436,7 +440,9 @@ mod tests {
             ping_timeout: None,
             ping_interval: None,
             acknowledgements: Default::default(),
-            auth: Some(Auth::Bearer{token: "OiJIUzI1NiIsInR5cCI6IkpXVCJ".to_string()}),
+            auth: Some(Auth::Bearer {
+                token: "OiJIUzI1NiIsInR5cCI6IkpXVCJ".to_string(),
+            }),
         };
 
         send_events_and_assert(addr, config, tls).await;
@@ -454,7 +460,7 @@ mod tests {
             ping_interval: None,
             ping_timeout: None,
             acknowledgements: Default::default(),
-            auth: None
+            auth: None,
         };
         let tls = MaybeTlsSettings::Raw(());
 

From 8929ebbb82f646266e6288f3814c2a8706a58832 Mon Sep 17 00:00:00 2001
From: Weston Carlson <wez470@gmail.com>
Date: Thu, 21 Jul 2022 16:45:27 +0000
Subject: [PATCH 3/7] kick-ci


From 82875ea647aa477a47e4dfd90298ef571d749589 Mon Sep 17 00:00:00 2001
From: Weston Carlson <wez470@gmail.com>
Date: Fri, 5 Aug 2022 17:42:44 +0000
Subject: [PATCH 4/7] Add websocket auth header test.

---
 src/sinks/websocket/sink.rs | 101 ++++++++++++++++++++++++++----------
 1 file changed, 75 insertions(+), 26 deletions(-)

diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs
index 6c63eb74ecda9..6497a3e5a9544 100644
--- a/src/sinks/websocket/sink.rs
+++ b/src/sinks/websocket/sink.rs
@@ -385,6 +385,8 @@ mod tests {
     use tokio::time::timeout;
     use tokio_tungstenite::{
         accept_async,
+        accept_hdr_async,
+        tungstenite::handshake::server::{Request, Response},
         tungstenite::error::{Error as WsError, ProtocolError},
     };
 
@@ -414,7 +416,30 @@ mod tests {
         };
         let tls = MaybeTlsSettings::Raw(());
 
-        send_events_and_assert(addr, config, tls).await;
+        send_events_and_assert(addr, config, tls, None).await;
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn test_auth_websocket() {
+        trace_init();
+
+        let auth = Some(Auth::Bearer {
+            token: "OiJIUzI1NiIsInR5cCI6IkpXVCJ".to_string(),
+        });
+        let auth_clone = auth.clone();
+        let addr = next_addr();
+        let config = WebSocketSinkConfig {
+            uri: format!("ws://{}", addr),
+            tls: None,
+            encoding: JsonSerializerConfig::new().into(),
+            ping_interval: None,
+            ping_timeout: None,
+            acknowledgements: Default::default(),
+            auth,
+        };
+        let tls = MaybeTlsSettings::Raw(());
+
+        send_events_and_assert(addr, config, tls, auth_clone).await;
     }
 
     #[tokio::test(flavor = "multi_thread")]
@@ -440,12 +465,10 @@ mod tests {
             ping_timeout: None,
             ping_interval: None,
             acknowledgements: Default::default(),
-            auth: Some(Auth::Bearer {
-                token: "OiJIUzI1NiIsInR5cCI6IkpXVCJ".to_string(),
-            }),
+            auth: None,
         };
 
-        send_events_and_assert(addr, config, tls).await;
+        send_events_and_assert(addr, config, tls, None).await;
     }
 
     #[tokio::test]
@@ -464,7 +487,7 @@ mod tests {
         };
         let tls = MaybeTlsSettings::Raw(());
 
-        let mut receiver = create_count_receiver(addr, tls.clone(), true);
+        let mut receiver = create_count_receiver(addr, tls.clone(), true, None);
 
         let context = SinkContext::new_test();
         let (sink, _healthcheck) = config.build(context).await.unwrap();
@@ -480,7 +503,7 @@ mod tests {
         time::sleep(Duration::from_millis(500)).await;
         assert!(!receiver.await.is_empty());
 
-        let mut receiver = create_count_receiver(addr, tls, false);
+        let mut receiver = create_count_receiver(addr, tls, false, None);
         assert!(timeout(Duration::from_secs(10), receiver.connected())
             .await
             .is_ok());
@@ -490,8 +513,9 @@ mod tests {
         addr: SocketAddr,
         config: WebSocketSinkConfig,
         tls: MaybeTlsSettings,
+        auth: Option<Auth>,
     ) {
-        let mut receiver = create_count_receiver(addr, tls, false);
+        let mut receiver = create_count_receiver(addr, tls, false, auth);
 
         let context = SinkContext::new_test();
         let (sink, _healthcheck) = config.build(context).await.unwrap();
@@ -515,6 +539,7 @@ mod tests {
         addr: SocketAddr,
         tls: MaybeTlsSettings,
         interrupt_stream: bool,
+        auth: Option<Auth>,
     ) -> CountReceiver<String> {
         CountReceiver::receive_items_stream(move |tripwire, connected| async move {
             let listener = tls.bind(&addr).await.unwrap();
@@ -526,25 +551,49 @@ mod tests {
 
             let stream = stream
                 .take_until(tripwire)
-                .filter_map(|maybe_tls_stream| async move {
-                    let maybe_tls_stream = maybe_tls_stream.unwrap();
-                    let ws_stream = accept_async(maybe_tls_stream).await.unwrap();
-
-                    Some(
-                        ws_stream
-                            .filter_map(|msg| {
-                                future::ready(match msg {
-                                    Ok(msg) if msg.is_text() => Some(Ok(msg.into_text().unwrap())),
-                                    Err(WsError::Protocol(
-                                        ProtocolError::ResetWithoutClosingHandshake,
-                                    )) => None,
-                                    Err(e) => Some(Err(e)),
-                                    _ => None,
+                .filter_map(move |maybe_tls_stream| {
+                    let au = auth.clone();
+                    async move {
+                        let maybe_tls_stream = maybe_tls_stream.unwrap();
+                        let ws_stream = match au {
+                            Some(a) => {
+                                let auth_callback = |req: &Request, res: Response| {
+                                    let hdr = req.headers().get("Authorization");
+                                    if let Some(h) = hdr {
+                                        match a {
+                                            Auth::Bearer{token} => {
+                                                if format!("Bearer {}", token) != h.to_str().unwrap() {
+                                                    return Err(http::Response::<Option<String>>::new(None));
+                                                }
+                                            },
+                                            Auth::Basic{user: _user, password: _password} => {/* Not needed for tests at the moment */},
+                                        }
+                                    }
+                                    Ok(res)
+                                };
+                                accept_hdr_async(maybe_tls_stream, auth_callback).await.unwrap()
+                            },
+                            None => accept_async(maybe_tls_stream).await.unwrap(),
+                        };
+
+                        Some(
+                            ws_stream
+                                .filter_map(|msg| {
+                                    future::ready(match msg {
+                                        Ok(msg) if msg.is_text() => {
+                                            Some(Ok(msg.into_text().unwrap()))
+                                        },
+                                        Err(WsError::Protocol(
+                                            ProtocolError::ResetWithoutClosingHandshake,
+                                        )) => None,
+                                        Err(e) => Some(Err(e)),
+                                        _ => None,
+                                    })
                                 })
-                            })
-                            .take_while(|msg| future::ready(msg.is_ok()))
-                            .filter_map(|msg| future::ready(msg.ok())),
-                    )
+                                .take_while(|msg| future::ready(msg.is_ok()))
+                                .filter_map(|msg| future::ready(msg.ok())),
+                        )
+                    }
                 })
                 .map(move |ws_stream| {
                     connected.take().map(|trigger| trigger.send(()));

From ad42bc5d0c2b309567bc39883f7ad0662c0f827f Mon Sep 17 00:00:00 2001
From: Weston Carlson <wez470@gmail.com>
Date: Fri, 5 Aug 2022 17:53:06 +0000
Subject: [PATCH 5/7] Format code.

---
 src/sinks/websocket/sink.rs | 30 +++++++++++++++++++-----------
 1 file changed, 19 insertions(+), 11 deletions(-)

diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs
index 6497a3e5a9544..ee5dfbfe6ff67 100644
--- a/src/sinks/websocket/sink.rs
+++ b/src/sinks/websocket/sink.rs
@@ -384,10 +384,9 @@ mod tests {
     use serde_json::Value as JsonValue;
     use tokio::time::timeout;
     use tokio_tungstenite::{
-        accept_async,
-        accept_hdr_async,
-        tungstenite::handshake::server::{Request, Response},
+        accept_async, accept_hdr_async,
         tungstenite::error::{Error as WsError, ProtocolError},
+        tungstenite::handshake::server::{Request, Response},
     };
 
     use super::*;
@@ -561,18 +560,27 @@ mod tests {
                                     let hdr = req.headers().get("Authorization");
                                     if let Some(h) = hdr {
                                         match a {
-                                            Auth::Bearer{token} => {
-                                                if format!("Bearer {}", token) != h.to_str().unwrap() {
-                                                    return Err(http::Response::<Option<String>>::new(None));
+                                            Auth::Bearer { token } => {
+                                                if format!("Bearer {}", token)
+                                                    != h.to_str().unwrap()
+                                                {
+                                                    return Err(
+                                                        http::Response::<Option<String>>::new(None),
+                                                    );
                                                 }
-                                            },
-                                            Auth::Basic{user: _user, password: _password} => {/* Not needed for tests at the moment */},
+                                            }
+                                            Auth::Basic {
+                                                user: _user,
+                                                password: _password,
+                                            } => { /* Not needed for tests at the moment */ }
                                         }
                                     }
                                     Ok(res)
                                 };
-                                accept_hdr_async(maybe_tls_stream, auth_callback).await.unwrap()
-                            },
+                                accept_hdr_async(maybe_tls_stream, auth_callback)
+                                    .await
+                                    .unwrap()
+                            }
                             None => accept_async(maybe_tls_stream).await.unwrap(),
                         };
 
@@ -582,7 +590,7 @@ mod tests {
                                     future::ready(match msg {
                                         Ok(msg) if msg.is_text() => {
                                             Some(Ok(msg.into_text().unwrap()))
-                                        },
+                                        }
                                         Err(WsError::Protocol(
                                             ProtocolError::ResetWithoutClosingHandshake,
                                         )) => None,

From 97323f7bb71676afc4068b4cee13b1d4b6f5e232 Mon Sep 17 00:00:00 2001
From: Weston Carlson <wez470@gmail.com>
Date: Tue, 9 Aug 2022 16:19:25 +0000
Subject: [PATCH 6/7] Derive configurable for websocket sink auth.

---
 src/sinks/websocket/config.rs | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/src/sinks/websocket/config.rs b/src/sinks/websocket/config.rs
index f05a0ef706cb3..8a4f4ec7a1826 100644
--- a/src/sinks/websocket/config.rs
+++ b/src/sinks/websocket/config.rs
@@ -43,6 +43,8 @@ pub struct WebSocketSinkConfig {
         skip_serializing_if = "crate::serde::skip_serializing_if_default"
     )]
     pub acknowledgements: AcknowledgementsConfig,
+
+    #[configurable(derived)]
     pub auth: Option<Auth>,
 }
 

From c438280219c81495ddbcb80e0d6dc27ab91630af Mon Sep 17 00:00:00 2001
From: Weston Carlson <wez470@gmail.com>
Date: Tue, 9 Aug 2022 19:27:20 +0000
Subject: [PATCH 7/7] Add auth config to websocket docs.

---
 website/cue/reference/components/sinks/websocket.cue | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/website/cue/reference/components/sinks/websocket.cue b/website/cue/reference/components/sinks/websocket.cue
index 6afb5215d231a..ca0c1dfe532ee 100644
--- a/website/cue/reference/components/sinks/websocket.cue
+++ b/website/cue/reference/components/sinks/websocket.cue
@@ -61,6 +61,10 @@ components: sinks: websocket: {
 	}
 
 	configuration: {
+		auth: configuration._http_auth & {_args: {
+			password_example: "${HTTP_PASSWORD}"
+			username_example: "${HTTP_USERNAME}"
+		}}
 		uri: {
 			description: """
 				The WebSocket URI to connect to. This should include the protocol and host,