Skip to content

Commit

Permalink
Merge pull request #1656 from cloudwego/release-v0.12.1
Browse files Browse the repository at this point in the history
chore: release v0.12.1
  • Loading branch information
jayantxie authored Jan 3, 2025
2 parents 0f3df4d + 11073fe commit d6f3824
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 26 deletions.
3 changes: 3 additions & 0 deletions client/genericclient/generic_stream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func newClientStreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
if extra.GetExtra(generic.CombineServiceKey) == "true" {
svcInfo.Extra["combine_service"] = true
}
if pkg := extra.GetExtra("PackageName"); pkg != "" {
svcInfo.Extra["PackageName"] = pkg
}
}
return svcInfo
}
1 change: 1 addition & 0 deletions client/genericclient/generic_stream_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestGenericStreamService(t *testing.T) {
svcInfo := newClientStreamingServiceInfo(g)
test.Assert(t, svcInfo.Extra["generic"] == true)
test.Assert(t, svcInfo.Extra["combine_service"] == nil)
test.Assert(t, svcInfo.Extra["PackageName"] == "pbapi")
svcInfo.GenericMethod = func(name string) serviceinfo.MethodInfo {
return svcInfo.Methods[name]
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/bytedance/gopkg v0.1.1
github.com/bytedance/sonic v1.12.5
github.com/cloudwego/configmanager v0.2.2
github.com/cloudwego/dynamicgo v0.4.6
github.com/cloudwego/dynamicgo v0.4.7-0.20241220085612-55704ea4ca8f
github.com/cloudwego/fastpb v0.0.5
github.com/cloudwego/frugal v0.2.3
github.com/cloudwego/gopkg v0.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/configmanager v0.2.2 h1:sVrJB8gWYTlPV2OS3wcgJSO9F2/9Zbkmcm1Z7jempOU=
github.com/cloudwego/configmanager v0.2.2/go.mod h1:ppiyU+5TPLonE8qMVi/pFQk2eL3Q4P7d4hbiNJn6jwI=
github.com/cloudwego/dynamicgo v0.4.6 h1:raRdvLN1WsGl5WsNd2Ul86s8PFQPu8soF4ALSJ9MdC4=
github.com/cloudwego/dynamicgo v0.4.6/go.mod h1:DknfxjIMuGvXow409bS/AWycXONdc02HECBL0qpNqTY=
github.com/cloudwego/dynamicgo v0.4.7-0.20241220085612-55704ea4ca8f h1:IERXjxDg3Pbatb5z/dR8Qr8XUA1FpDVa73BnwbeQ76U=
github.com/cloudwego/dynamicgo v0.4.7-0.20241220085612-55704ea4ca8f/go.mod h1:DknfxjIMuGvXow409bS/AWycXONdc02HECBL0qpNqTY=
github.com/cloudwego/fastpb v0.0.5 h1:vYnBPsfbAtU5TVz5+f9UTlmSCixG9F9vRwaqE0mZPZU=
github.com/cloudwego/fastpb v0.0.5/go.mod h1:Bho7aAKBUtT9RPD2cNVkTdx4yQumfSv3If7wYnm1izk=
github.com/cloudwego/frugal v0.2.3 h1:t1hhhAi8lXcx7Ncs4PR1pSZ90vlDU1cy5K2btDMFpoA=
Expand Down
5 changes: 4 additions & 1 deletion pkg/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ type ExtraProvider interface {
GetExtra(key string) string
}

const CombineServiceKey = "combine_service"
const (
CombineServiceKey = "combine_service"
packageNameKey = "PackageName"
)

// BinaryThriftGeneric raw thrift binary Generic
func BinaryThriftGeneric() Generic {
Expand Down
24 changes: 14 additions & 10 deletions pkg/generic/grpcjsonpb_test/generic_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net"
"strings"
"sync"
"time"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/client/genericclient"
Expand All @@ -33,11 +32,18 @@ import (
"github.com/cloudwego/kitex/transport"
)

func newGenericClient(g generic.Generic, targetIPPort string) genericclient.Client {
cli, err := genericclient.NewStreamingClient("destService", g,
client.WithTransportProtocol(transport.GRPC),
client.WithHostPorts(targetIPPort),
)
var startChan chan struct{}

func init() {
startChan = make(chan struct{})
server.RegisterStartHook(func() {
startChan <- struct{}{}
})
}

func newGenericClient(g generic.Generic, targetIPPort string, cliOpts ...client.Option) genericclient.Client {
cliOpts = append(cliOpts, client.WithTransportProtocol(transport.GRPC), client.WithHostPorts(targetIPPort))
cli, err := genericclient.NewStreamingClient("destService", g, cliOpts...)
if err != nil {
panic(err)
}
Expand All @@ -53,6 +59,8 @@ func newMockTestServer(handler mock.Mock, addr net.Addr, opts ...server.Option)
panic(err)
}
}()
// wait for server starting to avoid data race
<-startChan
return svr
}

Expand Down Expand Up @@ -80,7 +88,6 @@ func (s *StreamingTestImpl) ClientStreamingTest(stream mock.Mock_ClientStreaming
}
fmt.Printf("Recv: %s\n", req.Message)
msgs = append(msgs, req.Message)
time.Sleep(time.Second)
}
return stream.SendAndClose(&mock.MockResp{Message: "all message: " + strings.Join(msgs, ", ")})
}
Expand All @@ -94,7 +101,6 @@ func (s *StreamingTestImpl) ServerStreamingTest(req *mock.MockReq, stream mock.M
if err != nil {
return err
}
time.Sleep(time.Second)
}
return
}
Expand All @@ -121,7 +127,6 @@ func (s *StreamingTestImpl) BidirectionalStreamingTest(stream mock.Mock_Bidirect
return
}
fmt.Printf("BidirectionaStreamingTest: received message = %s\n", msg.Message)
time.Sleep(time.Second)
}
}()

Expand All @@ -140,7 +145,6 @@ func (s *StreamingTestImpl) BidirectionalStreamingTest(stream mock.Mock_Bidirect
return
}
fmt.Printf("BidirectionaStreamingTest: sent message = %s\n", resp)
time.Sleep(time.Second)
}
}()
wg.Wait()
Expand Down
29 changes: 23 additions & 6 deletions pkg/generic/grpcjsonpb_test/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ import (
"strings"
"sync"
"testing"
"time"

dproto "github.com/cloudwego/dynamicgo/proto"
"github.com/tidwall/gjson"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/client/genericclient"
"github.com/cloudwego/kitex/internal/mocks/proto/kitex_gen/pbapi/mock"
"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/generic"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
)

Expand All @@ -51,7 +53,6 @@ func TestClientStreaming(t *testing.T) {
req := fmt.Sprintf(`{"message": "grpc client streaming generic %dth request"}`, i)
err = streamCli.Send(req)
test.Assert(t, err == nil)
time.Sleep(100 * time.Millisecond)
}
resp, err := streamCli.CloseAndRecv()
test.Assert(t, err == nil)
Expand Down Expand Up @@ -84,7 +85,6 @@ func TestServerStreaming(t *testing.T) {
fmt.Printf("serverStreaming message received: %s\n", strResp)
test.Assert(t, strings.Contains(strResp, "grpc server streaming generic request ->"))
}
time.Sleep(100 * time.Millisecond)
}
}

Expand Down Expand Up @@ -127,7 +127,6 @@ func TestBidirectionalStreaming(t *testing.T) {
fmt.Printf("bidirectionalStreaming message received: %s\n", strResp)
test.Assert(t, strings.Contains(strResp, "th response"))
}
time.Sleep(100 * time.Millisecond)
}
}()
wg.Wait()
Expand All @@ -148,16 +147,34 @@ func TestUnary(t *testing.T) {
test.Assert(t, reflect.DeepEqual(gjson.Get(strResp, "message").String(), "hello unary request"))
}

func initStreamingClient(t *testing.T, ctx context.Context, addr, idl string) genericclient.Client {
func initStreamingClient(t *testing.T, ctx context.Context, addr, idl string, cliOpts ...client.Option) genericclient.Client {
dOpts := dproto.Options{}
p, err := generic.NewPbFileProviderWithDynamicGo(idl, ctx, dOpts)
test.Assert(t, err == nil)
g, err := generic.JSONPbGeneric(p)
test.Assert(t, err == nil)
return newGenericClient(g, addr)
return newGenericClient(g, addr, cliOpts...)
}

func initMockTestServer(handler mock.Mock, address string) server.Server {
addr, _ := net.ResolveTCPAddr("tcp", address)
return newMockTestServer(handler, addr)
}

func Test_invocationContainsPackage(t *testing.T) {
ctx := context.Background()
addr := test.GetLocalAddress()

svr := initMockTestServer(new(StreamingTestImpl), addr)
defer svr.Stop()
cli := initStreamingClient(t, ctx, addr, "./idl/pbapi.proto", client.WithMiddleware(func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
ri := rpcinfo.GetRPCInfo(ctx)
// make sure the package name has been injected into Invocation
test.Assert(t, ri.Invocation().PackageName() == "pbapi", ri.Invocation())
return nil
}
}))
_, err := genericclient.NewClientStreaming(ctx, cli, "ClientStreamingTest")
test.Assert(t, err == nil, err)
}
5 changes: 5 additions & 0 deletions pkg/generic/jsonpb_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func newJsonPbCodec(p PbDescriptorProviderDynamicGo, opts *Options) *jsonPbCodec
convOpts := opts.dynamicgoConvOpts
c.convOpts = convOpts
c.setCombinedServices(svc.IsCombinedServices())
c.setPackageName(svc.PackageName())

c.svcDsc.Store(svc)
go c.update()
Expand All @@ -82,6 +83,10 @@ func (c *jsonPbCodec) setCombinedServices(isCombinedServices bool) {
}
}

func (c *jsonPbCodec) setPackageName(pkg string) {
c.extra[packageNameKey] = pkg
}

func (c *jsonPbCodec) getMessageReaderWriter() interface{} {
pbSvc, ok := c.svcDsc.Load().(*dproto.ServiceDescriptor)
if !ok {
Expand Down
1 change: 1 addition & 0 deletions pkg/generic/jsonpb_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestJsonPbCodec(t *testing.T) {
test.Assert(t, method.StreamingMode == serviceinfo.StreamingNone)
test.Assert(t, jpc.svcName == "Echo")
test.Assert(t, jpc.extra[CombineServiceKey] == "false")
test.Assert(t, jpc.extra[packageNameKey] == "test")

rw := jpc.getMessageReaderWriter()
_, ok := rw.(gproto.MessageWriter)
Expand Down
6 changes: 6 additions & 0 deletions pkg/generic/thrift/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"github.com/cloudwego/gopkg/bufiox"
"github.com/cloudwego/gopkg/protocol/thrift"
"github.com/cloudwego/gopkg/protocol/thrift/base"
)

Expand All @@ -31,5 +32,10 @@ func NewWriteBinary() *WriteBinary {
}

func (w *WriteBinary) Write(ctx context.Context, out bufiox.Writer, msg interface{}, method string, isClient bool, requestBase *base.Base) error {
bw := thrift.NewBufferWriter(out)
defer bw.Recycle()
if err := bw.WriteFieldStop(); err != nil {
return err
}
return nil
}
11 changes: 6 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,13 @@ func (s *server) Run() (err error) {
if err != nil {
return err
}
s.Lock()
s.svr, err = remotesvr.NewServer(s.opt.RemoteOpt, transHdlr)
s.Unlock()
svr, err := remotesvr.NewServer(s.opt.RemoteOpt, transHdlr)
if err != nil {
return err
}
s.Lock()
s.svr = svr
s.Unlock()

// start profiler
if s.opt.RemoteOpt.Profiler != nil {
Expand All @@ -268,7 +269,7 @@ func (s *server) Run() (err error) {
})
}

errCh := s.svr.Start()
errCh := svr.Start()
select {
case err = <-errCh:
klog.Errorf("KITEX: server start error: error=%s", err.Error())
Expand All @@ -281,7 +282,7 @@ func (s *server) Run() (err error) {
}
muStartHooks.Unlock()
s.Lock()
s.buildRegistryInfo(s.svr.Address())
s.buildRegistryInfo(svr.Address())
s.Unlock()

if err = s.waitExit(errCh); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ package kitex
// Name and Version info of this framework, used for statistics and debug
const (
Name = "Kitex"
Version = "v0.12.0"
Version = "v0.12.1"
)

0 comments on commit d6f3824

Please sign in to comment.