diff --git a/client/client.go b/client/client.go index daf64e6..59d7462 100644 --- a/client/client.go +++ b/client/client.go @@ -17,6 +17,7 @@ limitations under the License. package client import ( + "net/http" "time" "k8s.io/apimachinery/pkg/runtime" @@ -28,7 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2" - "github.com/clusterpedia-io/client-go/constants" + "github.com/clusterpedia-io/client-go/tools/transport" ) const ( @@ -37,46 +38,48 @@ const ( DefaultTimeoutSeconds = 10 ) +var Scheme = runtime.NewScheme() + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(Scheme)) + utilruntime.Must(clusterv1alpha2.AddToScheme(Scheme)) +} + func Client() (client.Client, error) { - restConfig, err := ctrl.GetConfig() + config, err := ctrl.GetConfig() if err != nil { return nil, err } - return newClient(restConfig) + return newClient(config) } func ClusterClient(cluster string) (client.Client, error) { - restConfig, err := ctrl.GetConfig() + config, err := ctrl.GetConfig() if err != nil { return nil, err } - return newClient(restConfig, cluster) + return newClient(config, cluster) } -func GetClient(restConfig *rest.Config, cluster ...string) (client.Client, error) { - return newClient(restConfig, cluster...) +func GetClient(config *rest.Config, cluster ...string) (client.Client, error) { + return newClient(config, cluster...) } -func newClient(restConfig *rest.Config, cluster ...string) (client.Client, error) { +func newClient(config *rest.Config, cluster ...string) (client.Client, error) { var err error - if len(cluster) == 1 { - restConfig, err = ClusterConfigFor(restConfig, cluster[0]) + config, err = ClusterConfigFor(config, cluster[0]) } else { - restConfig, err = ConfigFor(restConfig) + config, err = ConfigFor(config) } if err != nil { return nil, err } - scheme := runtime.NewScheme() - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(clusterv1alpha2.AddToScheme(scheme)) - - c, err := client.New(restConfig, client.Options{ - Scheme: scheme, + c, err := client.New(config, client.Options{ + Scheme: Scheme, }) if err != nil { return nil, err @@ -85,33 +88,27 @@ func newClient(restConfig *rest.Config, cluster ...string) (client.Client, error return c, nil } -func ConfigFor(cfg *rest.Config) (*rest.Config, error) { - configShallowCopy := *cfg - - // reset clusterpedia api path - if err := SetConfigDefaults(&configShallowCopy); err != nil { +func NewForConfig(cfg *rest.Config) (kubernetes.Interface, error) { + config, err := ConfigFor(cfg) + if err != nil { return nil, err } - return &configShallowCopy, nil -} - -func ClusterConfigFor(cfg *rest.Config, cluster string) (*rest.Config, error) { - configShallowCopy, err := ConfigFor(cfg) + kubeClient, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } - configShallowCopy.Host += constants.ClusterAPIPath + cluster - return configShallowCopy, nil + + return kubeClient, nil } -func NewForConfig(cfg *rest.Config) (kubernetes.Interface, error) { - clientConfig, err := ConfigFor(cfg) +func NewClusterForConfig(cfg *rest.Config, cluster string) (kubernetes.Interface, error) { + config, err := ClusterConfigFor(cfg, cluster) if err != nil { return nil, err } - kubeClient, err := kubernetes.NewForConfig(clientConfig) + kubeClient, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } @@ -119,22 +116,35 @@ func NewForConfig(cfg *rest.Config) (kubernetes.Interface, error) { return kubeClient, nil } -func NewClusterForConfig(cfg *rest.Config, cluster string) (kubernetes.Interface, error) { - clientConfig, err := ClusterConfigFor(cfg, cluster) - if err != nil { +func ConfigFor(cfg *rest.Config) (*rest.Config, error) { + configShallowCopy := *cfg + if err := SetConfigDefaults(&configShallowCopy); err != nil { return nil, err } - kubeClient, err := kubernetes.NewForConfig(clientConfig) - if err != nil { + // wrap a transport to rest client config + configShallowCopy.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return transport.NewTransport(configShallowCopy.Host, rt) + }) + + return &configShallowCopy, nil +} + +func ClusterConfigFor(cfg *rest.Config, cluster string) (*rest.Config, error) { + configShallowCopy := *cfg + if err := SetConfigDefaults(&configShallowCopy); err != nil { return nil, err } - return kubeClient, nil + // wrap a cluster transport to rest client config + configShallowCopy.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return transport.NewTransportWithCluster(configShallowCopy.Host, cluster, rt) + }) + + return &configShallowCopy, nil } func SetConfigDefaults(config *rest.Config) error { - config.Host += constants.ClusterPediaAPIPath if config.Timeout == 0 { config.Timeout = DefaultTimeoutSeconds * time.Second } diff --git a/examples/clusterpedia-client/main.go b/examples/clusterpedia-client/main.go index 474c98e..6467653 100644 --- a/examples/clusterpedia-client/main.go +++ b/examples/clusterpedia-client/main.go @@ -1,62 +1,28 @@ -/* -Copyright 2021 clusterpedia Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package main import ( "context" "fmt" - clusterpediaclient "github.com/clusterpedia-io/client-go/clusterpediaclient" - "github.com/clusterpedia-io/client-go/tools/builder" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ctrl "sigs.k8s.io/controller-runtime" + + "github.com/clusterpedia-io/client-go/client" ) func main() { - restConfig, err := ctrl.GetConfig() + config, err := ctrl.GetConfig() if err != nil { panic(err) } - cc, err := clusterpediaclient.NewForConfig(restConfig) + client, err := client.NewClusterForConfig(config, "cluster1") if err != nil { panic(err) } - collectionResource, err := cc.PediaClusterV1beta1().CollectionResource().List(context.TODO(), metav1.ListOptions{}) + pod, err := client.CoreV1().Pods("default").Get(context.TODO(), "pod1", metav1.GetOptions{}) if err != nil { panic(err) } - - for _, item := range collectionResource.Items { - fmt.Printf("resource info: %v\n", item) - } - - // build listOptions - options := builder.ListOptionsBuilder(). - Namespaces("kube-system"). - Options() - - resources, err := cc.PediaClusterV1beta1().CollectionResource().Fetch(context.TODO(), "workflows", options, nil) - if err != nil { - panic(err) - } - - for _, item := range resources.Items { - fmt.Printf("resource info: %v\n", item) - } + fmt.Println(pod) } diff --git a/examples/collectionresource-client/main.go b/examples/collectionresource-client/main.go new file mode 100644 index 0000000..29a5f55 --- /dev/null +++ b/examples/collectionresource-client/main.go @@ -0,0 +1,62 @@ +/* +Copyright 2021 clusterpedia Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + + clusterpediaclient "github.com/clusterpedia-io/client-go/clusterpediaclient" + "github.com/clusterpedia-io/client-go/tools/builder" +) + +func main() { + restConfig, err := ctrl.GetConfig() + if err != nil { + panic(err) + } + cc, err := clusterpediaclient.NewForConfig(restConfig) + if err != nil { + panic(err) + } + + collectionResource, err := cc.PediaClusterV1beta1().CollectionResource().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + panic(err) + } + + for _, item := range collectionResource.Items { + fmt.Printf("resource info: %v\n", item) + } + + // build listOptions + options := builder.ListOptionsBuilder(). + Namespaces("kube-system"). + Options() + + resources, err := cc.PediaClusterV1beta1().CollectionResource().Fetch(context.TODO(), "workflows", options, nil) + if err != nil { + panic(err) + } + + for _, item := range resources.Items { + fmt.Printf("resource info: %v\n", item) + } +} diff --git a/examples/transport/main.go b/examples/transport/main.go new file mode 100644 index 0000000..82a2848 --- /dev/null +++ b/examples/transport/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "context" + "fmt" + "log" + "net/http" + + "github.com/clusterpedia-io/client-go/tools/transport" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + ctrl "sigs.k8s.io/controller-runtime" +) + +func main() { + config, err := ctrl.GetConfig() + if err != nil { + log.Fatalf("failed to init config: %v", err) + } + config.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return transport.NewTransportWithCluster(config.Host, "cluster", rt) + }) + + client, err := clientset.NewForConfig(config) + if err != nil { + log.Fatalf("failed to init clientset: %v", err) + } + + pod, err := client.CoreV1().Pods("demo-system").Get(context.TODO(), "pod1", metav1.GetOptions{}) + if err != nil { + log.Fatalf("failed to list pods: %v", err) + } + fmt.Println(pod) +} diff --git a/tools/transport/round_trippers.go b/tools/transport/round_trippers.go new file mode 100644 index 0000000..91f18fb --- /dev/null +++ b/tools/transport/round_trippers.go @@ -0,0 +1,89 @@ +/* +Copyright 2021 clusterpedia Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package transport + +import ( + "net/http" + "net/url" + "path" + "regexp" + "strings" + + "github.com/clusterpedia-io/client-go/constants" +) + +var ( + // regex matches "/apis/clusterpedia.io/{version}/{path}" + clusterpediaAPIsRegex = regexp.MustCompile(`^(/apis/clusterpedia\.io/v\w*.)/(.*)`) +) + +// ClusternetTransport is a transport to redirect requests to clusternet-hub +type ClusterpediaTransport struct { + // relative paths may omit leading slash + path string + cluster string + + rt http.RoundTripper +} + +func (t *ClusterpediaTransport) RoundTrip(req *http.Request) (*http.Response, error) { + t.normalizeLocation(req.URL) + return t.rt.RoundTrip(req) +} + +// normalizeLocation format the request URL to Clusternet shadow GVKs +func (t *ClusterpediaTransport) normalizeLocation(location *url.URL) { + curPath := location.Path + // Trim returns a slice of the string s with all leading and trailing Unicode code points contained in cutset removed. + // so we use Replace here + reqPath := strings.Replace(curPath, t.path, "", 1) + + // we don't normalize request for Group clusterpedia.io + if clusterpediaAPIsRegex.MatchString(reqPath) { + return + } + + paths := []string{constants.ClusterPediaAPIPath} + if len(t.cluster) > 0 { + paths = append(paths, "clusters", t.cluster) + } + location.Path = path.Join(append(paths, reqPath)...) +} + +func NewTransportWithCluster(host, cluster string, rt http.RoundTripper) *ClusterpediaTransport { + // host must be a host string, a host:port pair, or a URL to the base of the apiserver. + // If a URL is given then the (optional) Path of that URL represents a prefix that must + // be appended to all request URIs used to access the apiserver. This allows a frontend + // proxy to easily relocate all of the apiserver endpoints. + return &ClusterpediaTransport{ + path: urlMustParse(host).Path, + cluster: cluster, + rt: rt, + } +} + +func NewTransport(host string, rt http.RoundTripper) *ClusterpediaTransport { + return NewTransportWithCluster(host, "", rt) +} + +func urlMustParse(path string) *url.URL { + location, err := url.Parse(strings.TrimRight(path, "/")) + if err != nil { + panic(err) + } + return location +}