Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jpts committed Jan 7, 2022
0 parents commit 64df881
Show file tree
Hide file tree
Showing 6 changed files with 1,244 additions and 0 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# kubectl-execws

A replacement for "kubectl exec" that works over WebSocket connections.

Kubernetes API server has support for exec over WebSockets, but it has yet to land in kubectl. This plugin is designed to be a stopgap until then!

Usage:
```
execws <pod name> [--kubeconfig] [-n namespace] [-it] [-c container] <cmd>
```

### Acknowledgements

Work inspired by [rmohr/kubernetes-custom-exec](https://github.com/rmohr/kubernetes-custom-exec) and [kairen/websocket-exec](https://github.com/kairen/websocket-exec).
248 changes: 248 additions & 0 deletions cmd/exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package cmd

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"sync"

"github.com/gorilla/websocket"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

type Options struct {
Command []string
Container string
Kconfig string
Namespace string
Object string
Pod string
Stdin bool
TTY bool
}

var cfg clientcmd.ClientConfig

var protocols = []string{
"v4.channel.k8s.io",
"v3.channel.k8s.io",
"v2.channel.k8s.io",
"channel.k8s.io",
}

var cacheBuff bytes.Buffer

const (
stdin = iota
stdout
stderr
)

// prep a http req
func prepExec(opts *Options) (*http.Request, error) {
//var cfg clientcmd.ClientConfig
switch opts.Kconfig {
case "":
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
cfg = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
loadingRules,
&clientcmd.ConfigOverrides{})
default:
cfg = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kconfig},
&clientcmd.ConfigOverrides{})
}
clientConf, err := cfg.ClientConfig()
if err != nil {
return nil, err
}

var namespace string
switch opts.Namespace {
case "":
namespace, _, err = cfg.Namespace()
if err != nil {
return nil, err
}
default:
namespace = opts.Namespace
}

u, err := url.Parse(clientConf.Host)
if err != nil {
return nil, err
}

switch u.Scheme {
case "https":
u.Scheme = "wss"
case "http":
u.Scheme = "ws"
default:
return nil, fmt.Errorf("Malformed URL %s", u.String())
}

u.Path = fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/exec", namespace, opts.Pod)
rawQuery := "stdout=true&stderr=true"
for _, c := range opts.Command {
rawQuery += "&command=" + c
}

if opts.Container != "" {
rawQuery += "&container=" + opts.Container
}

if opts.TTY {
rawQuery += "&tty=true"
}

if opts.Stdin {
rawQuery += "&stdin=true"
}
u.RawQuery = rawQuery

req := &http.Request{
Method: http.MethodGet,
URL: u,
}

return req, nil

}

type RoundTripCallback func(conn *websocket.Conn) error

type WebsocketRoundTripper struct {
Dialer *websocket.Dialer
Callback RoundTripCallback
}

//req -> ws callback
func doExec(req *http.Request) error {
config, err := cfg.ClientConfig()
if err != nil {
return err
}

tlsConfig, err := rest.TLSConfigFor(config)
if err != nil {
return err
}

dialer := &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsConfig,
Subprotocols: protocols,
}

rt := &WebsocketRoundTripper{
Callback: WsCallback,
Dialer: dialer,
}

rter, err := rest.HTTPWrappersForConfig(config, rt)
if err != nil {
return err
}

_, err = rter.RoundTrip(req)
if err != nil {
return err

}
return nil
}

type ApiServerError struct {
Reason string `json:"reason"`
Message string `json:"message"`
}

func (d *WebsocketRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
conn, resp, err := d.Dialer.Dial(r.URL.String(), r.Header)
if err != nil {
var msg ApiServerError
err := json.NewDecoder(resp.Body).Decode(&msg)
// should probably match 400-599 here
if resp.StatusCode != 101 || err != nil {
errmsg := fmt.Sprintf("Error from server (%s): %s", msg.Reason, msg.Message)
return nil, errors.New(errmsg)
} else {
return nil, err
}
}
defer conn.Close()
return resp, d.Callback(conn)
}

func WsCallback(ws *websocket.Conn) error {
errChan := make(chan error, 3)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
buf := make([]byte, 1025)
for {
n, err := os.Stdin.Read(buf[1:])
if err != nil {
errChan <- err
return
}

cacheBuff.Write(buf[1:n])
cacheBuff.Write([]byte{13, 10})
if err := ws.WriteMessage(websocket.BinaryMessage, buf[:n+1]); err != nil {
errChan <- err
return
}
}
}()

go func() {
defer wg.Done()
for {
_, buf, err := ws.ReadMessage()
if err != nil {
errChan <- err
return
}

if len(buf) > 1 {
var w io.Writer
switch buf[0] {
case stdout:
w = os.Stdout
case stderr:
w = os.Stderr
}

if w == nil {
continue
}
s := strings.Replace(string(buf[1:]), cacheBuff.String(), "", -1)
_, err = w.Write([]byte(s))
if err != nil {
errChan <- err
return
}
}
cacheBuff.Reset()
}
}()

wg.Wait()
close(errChan)
err := <-errChan
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return nil
} else {
return err
}
}
84 changes: 84 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cmd

import (
"fmt"
"os"
"strings"

"github.com/spf13/cobra"
)

var (
kconfig string
tty bool
stdinFlag bool
quiet bool
container string
namespace string
)

var rootCmd = &cobra.Command{
Use: "execws <pod name> [--kubeconfig] [-n namespace] [-it] [-c container] <cmd>",
Short: "kubectl exec over WebSockets",
Long: `A replacement for "kubectl exec" that works over WebSocket connections.`,
Args: cobra.MinimumNArgs(2),
Run: func(cmd *cobra.Command, args []string) {
var object, pod string
var command []string
if len(args) == 3 {
object = args[0]
pod = args[1]
command = args[2:]
} else if strings.Contains(args[0], "/") {
parts := strings.Split(args[0], "/")
object = parts[0]
pod = parts[1]
command = args[1:]
} else if len(args) == 2 {
object = "pod"
pod = args[0]
command = args[1:]
} else {
fmt.Println("bad input")
os.Exit(1)
}

opts := &Options{
Command: command,
Container: container,
Kconfig: kconfig,
Namespace: namespace,
Object: object,
Pod: pod,
Stdin: stdinFlag,
TTY: tty,
}

req, _ := prepExec(opts)

err := doExec(req)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}

},
}

func Execute() {
err := rootCmd.Execute()
if err != nil {
os.Exit(1)
}
}

func init() {
rootCmd.PersistentFlags().StringVar(&kconfig, "kubeconfig", "", "kubeconfig file (default is $HOME/.kube/config)")
rootCmd.PersistentFlags().StringVarP(&namespace, "namespace", "n", "", "Override \"default\" namespace")
rootCmd.Flags().BoolVarP(&tty, "tty", "t", false, "Stdin is a TTY")
rootCmd.Flags().BoolVarP(&stdinFlag, "stdin", "i", false, "Pass stdin to container")
rootCmd.Flags().StringVarP(&container, "container", "c", "", "Container name")
//rootCmd.Flags().BoolVarP(&quiet, "quiet", "q", false, "")
//rootCmd.Flags().BoolVarP(&verb, "verbose", "v", false, "")

}
14 changes: 14 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module github.com/jpts/kubectl-execws

go 1.16

require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96 // indirect
github.com/gophercloud/gophercloud v0.1.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/spf13/cobra v1.3.0
k8s.io/client-go v0.21.8
k8s.io/klog v1.0.0 // indirect
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e // indirect
)
Loading

0 comments on commit 64df881

Please sign in to comment.