Skip to content

Commit

Permalink
initialize http transport once for all client calls (#1008)
Browse files Browse the repository at this point in the history
- optimizes single transport for re-using connections
  across many calls.

- do not close keep-alive connections, keeping them
  re-usable to ensure that we do not build sockets in
  TIME_WAIT state.
  • Loading branch information
harshavardhana authored Feb 6, 2022
1 parent e5696e5 commit 34e4bdc
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 153 deletions.
102 changes: 13 additions & 89 deletions pkg/apis/minio.min.io/v2/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
Expand Down Expand Up @@ -610,14 +609,9 @@ func (t *Tenant) MinIOServerEndpoint() string {
}

// MinIOHealthCheck check MinIO cluster health
func (t *Tenant) MinIOHealthCheck() bool {
// Keep TLS config.
tlsConfig := &tls.Config{
// Can't use SSLv3 because of POODLE and BEAST
// Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
// Can't use TLSv1.1 because of RC4 cipher usage
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: true, // FIXME: use trusted CA
func (t *Tenant) MinIOHealthCheck(tr *http.Transport) bool {
if tr.TLSClientConfig != nil {
tr.TLSClientConfig.InsecureSkipVerify = true
}

req, err := http.NewRequest(http.MethodGet, t.MinIOServerEndpoint()+"/minio/health/cluster", nil)
Expand All @@ -626,26 +620,8 @@ func (t *Tenant) MinIOHealthCheck() bool {
}

httpClient := &http.Client{
Transport:
// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
&http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 2 * time.Second,
KeepAlive: 10 * time.Second,
}).DialContext,
ResponseHeaderTimeout: 5 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 5 * time.Second,
TLSClientConfig: tlsConfig,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
},
}
defer httpClient.CloseIdleConnections()
Transport: tr,
}

resp, err := httpClient.Do(req)
if err != nil {
Expand All @@ -656,12 +632,12 @@ func (t *Tenant) MinIOHealthCheck() bool {
}

// NewMinIOAdmin initializes a new madmin.Client for operator interaction
func (t *Tenant) NewMinIOAdmin(minioSecret map[string][]byte, caContent []byte) (*madmin.AdminClient, error) {
return t.NewMinIOAdminForAddress("", minioSecret, caContent)
func (t *Tenant) NewMinIOAdmin(minioSecret map[string][]byte, tr *http.Transport) (*madmin.AdminClient, error) {
return t.NewMinIOAdminForAddress("", minioSecret, tr)
}

// NewMinIOAdminForAddress initializes a new madmin.Client for operator interaction
func (t *Tenant) NewMinIOAdminForAddress(address string, minioSecret map[string][]byte, caContent []byte) (*madmin.AdminClient, error) {
func (t *Tenant) NewMinIOAdminForAddress(address string, minioSecret map[string][]byte, tr *http.Transport) (*madmin.AdminClient, error) {
host, accessKey, secretKey, err := t.getMinIOTenantDetails(address, minioSecret)
if err != nil {
return nil, err
Expand All @@ -676,32 +652,6 @@ func (t *Tenant) NewMinIOAdminForAddress(address string, minioSecret map[string]
if err != nil {
return nil, err
}

tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 2 * time.Second,
KeepAlive: 10 * time.Second,
}).DialContext,
ResponseHeaderTimeout: 5 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 5 * time.Second,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}
if opts.Secure {
rootCAs := mustGetSystemCertPool()
rootCAs.AppendCertsFromPEM(caContent)
tr.TLSClientConfig = &tls.Config{
// Can't use SSLv3 because of POODLE and BEAST
// Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
// Can't use TLSv1.1 because of RC4 cipher usage
MinVersion: tls.VersionTLS12,
RootCAs: rootCAs,
}
}
madmClnt.SetCustomTransport(tr)

return madmClnt, nil
Expand Down Expand Up @@ -729,12 +679,12 @@ func (t *Tenant) getMinIOTenantDetails(address string, minioSecret map[string][]
}

// NewMinIOUser initializes a new console user
func (t *Tenant) NewMinIOUser(userCredentialSecrets []*corev1.Secret, caContent []byte) (*minio.Client, error) {
return t.NewMinIOUserForAddress("", userCredentialSecrets, caContent)
func (t *Tenant) NewMinIOUser(userCredentialSecrets []*corev1.Secret, tr *http.Transport) (*minio.Client, error) {
return t.NewMinIOUserForAddress("", userCredentialSecrets, tr)
}

// NewMinIOUserForAddress initializes a new console user
func (t *Tenant) NewMinIOUserForAddress(address string, userCredentialSecrets []*corev1.Secret, caContent []byte) (*minio.Client, error) {
func (t *Tenant) NewMinIOUserForAddress(address string, userCredentialSecrets []*corev1.Secret, tr *http.Transport) (*minio.Client, error) {
host := address
if host == "" {
host = t.MinIOServerHostAddress()
Expand All @@ -743,24 +693,6 @@ func (t *Tenant) NewMinIOUserForAddress(address string, userCredentialSecrets []
}
}

tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 2 * time.Second,
KeepAlive: 10 * time.Second,
}).DialContext,
ResponseHeaderTimeout: 5 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 5 * time.Second,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}

rootCAs := mustGetSystemCertPool()
rootCAs.AppendCertsFromPEM(caContent)

for _, cred := range userCredentialSecrets {
consoleAccessKey, ok := cred.Data["CONSOLE_ACCESS_KEY"]
if !ok {
Expand All @@ -775,14 +707,6 @@ func (t *Tenant) NewMinIOUserForAddress(address string, userCredentialSecrets []
return nil, errors.New("CONSOLE_SECRET_KEY not provided")
}

tr.TLSClientConfig = &tls.Config{
// Can't use SSLv3 because of POODLE and BEAST
// Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
// Can't use TLSv1.1 because of RC4 cipher usage
MinVersion: tls.VersionTLS12,
RootCAs: rootCAs,
}

opts := &minio.Options{
Transport: tr,
Secure: t.TLS(),
Expand All @@ -800,8 +724,8 @@ func (t *Tenant) NewMinIOUserForAddress(address string, userCredentialSecrets []
return nil, errors.New("no user credentials specified to initialize")
}

// mustGetSystemCertPool - return system CAs or empty pool in case of error (or windows)
func mustGetSystemCertPool() *x509.CertPool {
// MustGetSystemCertPool - return system CAs or empty pool in case of error (or windows)
func MustGetSystemCertPool() *x509.CertPool {
pool, err := x509.SystemCertPool()
if err != nil {
return x509.NewCertPool()
Expand Down
28 changes: 10 additions & 18 deletions pkg/controller/cluster/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ type Controller struct {
// Webhook server instance
ws *http.Server

// Client transport
transport *http.Transport

// monitor pods in the cluster to update the health information
podInformer cache.SharedIndexInformer

Expand Down Expand Up @@ -710,23 +713,12 @@ func (c *Controller) syncHandler(key string) error {
}
}

var caContent []byte
operatorCATLSCert, err := c.kubeClientSet.CoreV1().Secrets(miniov2.GetNSFromFile()).Get(ctx, "operator-ca-tls", metav1.GetOptions{})
// if custom ca.crt is not present in kubernetes secrets use the one stored in the pod
if err != nil {
caContent = miniov2.GetPodCAFromFile()
} else {
if val, ok := operatorCATLSCert.Data["ca.crt"]; ok {
caContent = val
}
}

tenantConfiguration, err := c.getTenantCredentials(ctx, tenant)
if err != nil {
return err
}

adminClnt, err := tenant.NewMinIOAdmin(tenantConfiguration, caContent)
adminClnt, err := tenant.NewMinIOAdmin(tenantConfiguration, c.getTransport())
if err != nil {
return err
}
Expand Down Expand Up @@ -841,7 +833,7 @@ func (c *Controller) syncHandler(key string) error {
klog.Infof("'%s/%s': Deploying pool %s", tenant.Namespace, tenant.Name, pool.Name)
// Check healthcheck for previous pool only if it's not a new setup,
// and check if they are online before adding this pool.
if addingNewPool && !tenant.MinIOHealthCheck() && len(tenant.Spec.Pools) > 1 {
if addingNewPool && !tenant.MinIOHealthCheck(c.getTransport()) && len(tenant.Spec.Pools) > 1 {
klog.Infof("'%s/%s': Deploying new pool failed %s: MinIO is not Ready", tenant.Namespace, tenant.Name, pool.Name)
return ErrMinIONotReady
}
Expand Down Expand Up @@ -892,7 +884,7 @@ func (c *Controller) syncHandler(key string) error {
break
}
podAddress := fmt.Sprintf("%s:9000", tenant.MinIOHLPodHostname(ssPod.Name))
podAdminClnt, err := tenant.NewMinIOAdminForAddress(podAddress, tenantConfiguration, caContent)
podAdminClnt, err := tenant.NewMinIOAdminForAddress(podAddress, tenantConfiguration, c.getTransport())
if err != nil {
return err
}
Expand Down Expand Up @@ -924,7 +916,7 @@ func (c *Controller) syncHandler(key string) error {
break
}
livePodAddress := fmt.Sprintf("%s:9000", tenant.MinIOHLPodHostname(livePod.Name))
livePodAdminClnt, err := tenant.NewMinIOAdminForAddress(livePodAddress, tenantConfiguration, caContent)
livePodAdminClnt, err := tenant.NewMinIOAdminForAddress(livePodAddress, tenantConfiguration, c.getTransport())
if err != nil {
return err
}
Expand Down Expand Up @@ -979,7 +971,7 @@ func (c *Controller) syncHandler(key string) error {
}
} else {
// check if MinIO is already online after the previous restart
if tenant.MinIOHealthCheck() {
if tenant.MinIOHealthCheck(c.getTransport()) {
tenant.Status.WaitingOnReady = nil
if _, err = c.updatePoolStatus(ctx, tenant); err != nil {
klog.Infof("'%s' Can't update tenant status: %v", key, err)
Expand All @@ -1005,7 +997,7 @@ func (c *Controller) syncHandler(key string) error {
// In loop above we compared all the versions in all pools.
// So comparing tenant.Spec.Image (version to update to) against one value from images slice is fine.
if tenant.Spec.Image != images[0] && tenant.Status.CurrentState != StatusUpdatingMinIOVersion {
if !tenant.MinIOHealthCheck() {
if !tenant.MinIOHealthCheck(c.getTransport()) {
klog.Infof("%s is not running can't update image online", key)
return ErrMinIONotReady
}
Expand Down Expand Up @@ -1206,7 +1198,7 @@ func (c *Controller) syncHandler(key string) error {
return err
}
// Make sure that MinIO is up and running to enable Log Search.
if !tenant.MinIOHealthCheck() {
if !tenant.MinIOHealthCheck(c.getTransport()) {
if _, err = c.updateTenantStatus(ctx, tenant, StatusWaitingForReadyState, totalReplicas); err != nil {
return err
}
Expand Down
29 changes: 8 additions & 21 deletions pkg/controller/cluster/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,9 @@ import (
"github.com/minio/madmin-go"

corev1 "k8s.io/api/core/v1"

k8serrors "k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/util/runtime"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"

miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2"
Expand Down Expand Up @@ -116,18 +112,7 @@ func (c *Controller) updateHealthStatusForTenant(tenant *miniov2.Tenant) error {
return err
}

var caContent []byte
operatorCATLSCert, err := c.kubeClientSet.CoreV1().Secrets(miniov2.GetNSFromFile()).Get(context.Background(), "operator-ca-tls", metav1.GetOptions{})
// if custom ca.crt is not present in kubernetes secrets use the one stored in the pod
if err != nil {
caContent = miniov2.GetPodCAFromFile()
} else {
if val, ok := operatorCATLSCert.Data["ca.crt"]; ok {
caContent = val
}
}

adminClnt, err := tenant.NewMinIOAdmin(tenantConfiguration, caContent)
adminClnt, err := tenant.NewMinIOAdmin(tenantConfiguration, c.getTransport())
if err != nil {
// show the error and continue
klog.Infof("'%s/%s': %v", tenant.Namespace, tenant.Name, err)
Expand Down Expand Up @@ -284,7 +269,7 @@ const (
RegularMode = "RegularMode"
)

func getHealthCheckTransport() *http.Transport {
func getHealthCheckTransport() func() *http.Transport {
// Keep TLS config.
tlsConfig := &tls.Config{
// Can't use SSLv3 because of POODLE and BEAST
Expand All @@ -293,7 +278,7 @@ func getHealthCheckTransport() *http.Transport {
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: true, // FIXME: use trusted CA
}
return &http.Transport{
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 2 * time.Second,
Expand All @@ -308,6 +293,9 @@ func getHealthCheckTransport() *http.Transport {
// in raw stream.
DisableCompression: true,
}
return func() *http.Transport {
return tr
}
}

// getMinIOHealthStatus returns the cluster health for a Tenant.
Expand Down Expand Up @@ -336,9 +324,8 @@ func getMinIOHealthStatusWithRetry(tenant *miniov2.Tenant, mode HealthMode, tryC
}

httpClient := &http.Client{
Transport: getHealthCheckTransport(),
Transport: getHealthCheckTransport()(),
}
defer httpClient.CloseIdleConnections()

resp, err := httpClient.Do(req)
if err != nil {
Expand Down
Loading

0 comments on commit 34e4bdc

Please sign in to comment.