Skip to content

Commit

Permalink
POC mongo: parse packets to and from the Target service
Browse files Browse the repository at this point in the history
  • Loading branch information
doodlesbykumbi committed Sep 14, 2020
1 parent b3c42e3 commit 64effdf
Show file tree
Hide file tree
Showing 19 changed files with 976 additions and 5 deletions.
17 changes: 12 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
module github.com/cyberark/secretless-broker

require (
github.com/aws/aws-sdk-go v1.15.79
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/Microsoft/go-winio v0.4.14 // indirect
github.com/aws/aws-sdk-go v1.29.15
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/codegangsta/cli v1.20.0
github.com/containerd/containerd v1.3.2 // indirect
Expand All @@ -17,25 +19,30 @@ require (
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible
github.com/google/btree v1.0.0 // indirect
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/gorilla/mux v1.7.4 // indirect
github.com/gotestyourself/gotestyourself v1.4.0 // indirect
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/hashicorp/vault/api v1.0.2
github.com/heptiolabs/healthcheck v0.0.0-20180807145615-6ff867650f40
github.com/imdario/mergo v0.3.8 // indirect
github.com/joho/godotenv v1.2.0
github.com/joho/godotenv v1.3.0
github.com/json-iterator/go v1.1.8 // indirect
github.com/lib/pq v0.0.0-20180123210206-19c8e9ad0095
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/pkg/errors v0.8.1
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.2.1
github.com/prometheus/client_golang v1.2.1 // indirect
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.3.0
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529
github.com/stretchr/testify v1.4.0
go.mongodb.org/mongo-driver v1.4.1
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5
google.golang.org/appengine v1.4.0 // indirect
gopkg.in/yaml.v2 v2.2.2
gotest.tools v1.4.0 // indirect
k8s.io/api v0.0.0-20180712090710-2d6f90ab1293
k8s.io/apiextensions-apiserver v0.0.0-20180808065829-408db4a50408
k8s.io/apimachinery v0.0.0-20180621070125-103fd098999d
Expand Down
112 changes: 112 additions & 0 deletions go.sum

Large diffs are not rendered by default.

64 changes: 64 additions & 0 deletions internal/plugin/connectors/tcp/mongodb/connection_details.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package mongodb

import "strconv"

// DefaultMongoDbPort is the default port for Cql database connections
// over TCP
const DefaultMongoDbPort = 27017

var sslOptions = []string{
"sslrootcert",
"sslmode",
"sslkey",
"sslcert",
}

// ConnectionDetails stores the connection info to the target database.
type ConnectionDetails struct {
Host string
Port int
Username string
Password string
sslOptions map[string]string
}

const (
sslModeDisable = "disable"
sslModeRequire = "require"
sslModeVerifyCA = "verify-ca"
sslModeVerifyFull = "verify-full"
)

// NewConnectionDetails constructs a ConnectionDetails object based on the options passed
// in that are based on resolved configuration fields.
func NewConnectionDetails(options map[string][]byte) (*ConnectionDetails, error) {
connectionDetails := ConnectionDetails{
sslOptions: map[string]string{},
}

if len(options["host"]) > 0 {
connectionDetails.Host = string(options["host"])
}

connectionDetails.Port = DefaultMongoDbPort
if len(options["port"]) > 0 {
portStr := string(options["port"])
connectionDetails.Port, _ = strconv.Atoi(portStr)
}

if len(options["username"]) > 0 {
connectionDetails.Username = string(options["username"])
}

if len(options["password"]) > 0 {
connectionDetails.Password = string(options["password"])
}

for _, option := range sslOptions {
if value := options[option]; len(value) > 0 {
connectionDetails.sslOptions[option] = string(value)
}
}

return &connectionDetails, nil
}
34 changes: 34 additions & 0 deletions internal/plugin/connectors/tcp/mongodb/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package mongodb

import (
"context"
"fmt"
"net"

"github.com/cyberark/secretless-broker/pkg/secretless/log"
"github.com/cyberark/secretless-broker/pkg/secretless/plugin/connector"
)

// SingleUseConnector is passed the client's net.Conn and the current CredentialValuesById,
// and returns an authenticated net.Conn to the target service
type SingleUseConnector struct {
logger log.Logger
}

// Connect receives a connection to the client, and opens a connection to the target using the client's connection
// and the credentials provided in credentialValuesByID
func (connector *SingleUseConnector) Connect(
clientConn net.Conn,
credentialValuesByID connector.CredentialValuesByID,
) (net.Conn, error) {
connDetails, _ := NewConnectionDetails(credentialValuesByID)

host := net.JoinHostPort(connDetails.Host, fmt.Sprintf("%d", connDetails.Port))
dialer := newProxyDialer()
backendConn, err := dialer.DialContext(context.Background(), "tcp", host)
if err != nil {
return nil, err
}

return backendConn, nil
}
47 changes: 47 additions & 0 deletions internal/plugin/connectors/tcp/mongodb/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package mongodb

import (
"net"

"github.com/cyberark/secretless-broker/pkg/secretless/plugin/connector"
"github.com/cyberark/secretless-broker/pkg/secretless/plugin/connector/tcp"
)

/*
NewConnector returns a tcp.Connector which returns an authenticated connection
to a target service for each incoming client connection. It is a required
method on the tcp.Plugin interface. The single argument passed in is of type
connector.Resources. It contains connector-specific config and a logger.
*/
func NewConnector(conRes connector.Resources) tcp.Connector {
connectorFunc := func(
clientConn net.Conn,
credentialValuesByID connector.CredentialValuesByID,
) (backendConn net.Conn, err error) {
conRes.Logger().DebugEnabled()
// singleUseConnector is responsible for generating the authenticated connection
// to the target service for each incoming client connection
singleUseConnector := &SingleUseConnector{
logger: conRes.Logger(),
}

return singleUseConnector.Connect(clientConn, credentialValuesByID)
}

return tcp.ConnectorFunc(connectorFunc)
}

// PluginInfo is required as part of the Secretless plugin spec. It provides important metadata about the plugin.
func PluginInfo() map[string]string {
return map[string]string{
"pluginAPIVersion": "0.1.0",
"type": "connector.tcp",
"id": "mongodb",
"description": "MongoDB Service Connector",
}
}

// GetTCPPlugin is required as part of the Secretless plugin spec for TCP connector plugins. It returns the TCP plugin.
func GetTCPPlugin() tcp.Plugin {
return tcp.ConnectorConstructor(NewConnector)
}
200 changes: 200 additions & 0 deletions internal/plugin/connectors/tcp/mongodb/proxy_dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package mongodb

import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"

"go.mongodb.org/mongo-driver/mongo/options"
)

// ProxyMessage represents a sent/received pair of parsed wire messages.
type ProxyMessage struct {
ServerAddress string
CommandName string
Sent *SentMessage
Received *ReceivedMessage
}

// proxyDialer is a ContextDialer implementation that wraps a net.Dialer and records the messages sent and received
// using connections created through it.
type proxyDialer struct {
*net.Dialer
sync.Mutex

messages []*ProxyMessage
// sentMap temporarily stores the message sent to the server using the requestID so it can map requests to their
// responses.
sentMap sync.Map
// addressTranslations maps dialed addresses to the remote addresses reported by the created connections if they
// differ. This can happen if a connection is dialed to a host name, in which case the reported remote address will
// be the resolved IP address.
addressTranslations sync.Map
}

var _ options.ContextDialer = (*proxyDialer)(nil)

func newProxyDialer() *proxyDialer {
return &proxyDialer{
Dialer: &net.Dialer{Timeout: 30 * time.Second},
}
}

func newProxyError(err error) error {
return fmt.Errorf("proxy error: %v", err)
}

func newProxyErrorWithWireMsg(_ []byte, err error) error {
return fmt.Errorf("proxy error for wiremessage: %v", err)
}

// DialContext creates a new proxyConnection.
func (p *proxyDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
netConn, err := p.Dialer.DialContext(ctx, network, address)
if err != nil {
return netConn, err
}

// If the connection's remote address does not match the dialed address, store it in the translations map for
// future look-up. Use the remote address as they key because that's what we'll have access to in the connection's
// Read/Write functions.
if remoteAddress := netConn.RemoteAddr().String(); remoteAddress != address {
p.addressTranslations.Store(remoteAddress, address)
}

proxy := &proxyConn{
Conn: netConn,
dialer: p,
}
return proxy, nil
}

func (p *proxyDialer) storeSentMessage(wm []byte) error {
p.Lock()
defer p.Unlock()

// Create a copy of the wire message so it can be parsed/stored and will not be affected if the wm slice is
// changed by the driver.
wmCopy := copyBytes(wm)
parsed, err := parseSentMessage(wmCopy)
if err != nil {
return err
}
fmt.Println("Sent: " + parsed.Command.String())
fmt.Println()
p.sentMap.Store(parsed.RequestID, parsed)
return nil
}

func (p *proxyDialer) storeReceivedMessage(wm []byte, addr string) error {
p.Lock()
defer p.Unlock()


serverAddress := addr
if translated, ok := p.addressTranslations.Load(addr); ok {
serverAddress = translated.(string)
}

// Create a copy of the wire message so it can be parsed/stored and will not be affected if the wm slice is
// changed by the driver. Parse the incoming message and get the corresponding outgoing message.
wmCopy := copyBytes(wm)

parsed, err := parseReceivedMessage(wmCopy)
if err != nil {
return err
}

mapValue, ok := p.sentMap.Load(parsed.ResponseTo)
if !ok {
return errors.New("no sent message found")
}
sent := mapValue.(*SentMessage)
p.sentMap.Delete(parsed.ResponseTo)

// Store the parsed message pair.
msgPair := &ProxyMessage{
// The command name is always the first key in the command document.
CommandName: sent.Command.Index(0).Key(),
ServerAddress: serverAddress,
Sent: sent,
Received: parsed,
}
fmt.Println("Received:", parsed.Response)
fmt.Println()
p.messages = append(p.messages, msgPair)
return nil
}

// Messages returns a slice of proxied messages. This slice is a copy of the messages proxied so far and will not be
// updated for messages proxied after this call.
func (p *proxyDialer) Messages() []*ProxyMessage {
p.Lock()
defer p.Unlock()

copiedMessages := make([]*ProxyMessage, 0, len(p.messages))
for _, msg := range p.messages {
copiedMessages = append(copiedMessages, msg)
}
return copiedMessages
}

// proxyConn is a net.Conn that wraps a network connection. All messages sent/received through a proxyConn are stored
// in the associated proxyDialer and are forwarded over the wrapped connection. Errors encountered when parsing and
// storing wire messages are wrapped to add context, while errors returned from the underlying network connection are
// forwarded without wrapping.
type proxyConn struct {
net.Conn
dialer *proxyDialer
}

// Write stores the given message in the proxyDialer associated with this connection and forwards the message to the
// server.
func (pc *proxyConn) Write(wm []byte) (n int, err error) {
if err := pc.dialer.storeSentMessage(wm); err != nil {
wrapped := fmt.Errorf("error storing sent message: %v", err)
//return 0, newProxyErrorWithWireMsg(wm, wrapped)
fmt.Println("Sent: Warning:", newProxyErrorWithWireMsg(wm, wrapped))
}

return pc.Conn.Write(wm)
}

// Read reads the message from the server into the given buffer and stores the read message in the proxyDialer
// associated with this connection.
func (pc *proxyConn) Read(wm []byte) (int, error) {
n, err := pc.Conn.Read(wm)
if err != nil {
return n, err
}
//
//// The driver reads wire messages in two phases: a four-byte read to get the length of the incoming wire message
//// and a (length-4) byte read to get the message itself. There's nothing to be stored during the initial four-byte
//// read because we can calculate the length from the rest of the message.
//if len(buffer) == 4 {
// return 4, nil
//}
//
//// The buffer contains the entire wire message except for the length bytes. Re-create the full message by appending
//// buffer to the end of a four-byte slice and using UpdateLength to set the length bytes.
//idx, wm := bsoncore.ReserveLength(nil)
//wm = append(wm, buffer...)
//wm = bsoncore.UpdateLength(wm, idx, int32(len(wm[idx:])))

if err := pc.dialer.storeReceivedMessage(wm, pc.RemoteAddr().String()); err != nil {
wrapped := fmt.Errorf("error storing received message: %v", err)
//return 0, newProxyErrorWithWireMsg(wm, wrapped)
fmt.Println("Received: Warning:", newProxyErrorWithWireMsg(wm, wrapped))
}

return n, nil
}
Loading

0 comments on commit 64effdf

Please sign in to comment.