Skip to content

Commit

Permalink
Event guarantee feature (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
tahadostifam authored Aug 30, 2024
1 parent e23e813 commit 0d03f83
Show file tree
Hide file tree
Showing 21 changed files with 52 additions and 895 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ vendor/
minio_data/*
logs/logs*.json
coverage.out*
./protobuf/gen/es/protobuf/node_modules
protobuf/gen/es/protobuf/node_modules
3 changes: 2 additions & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/auth/v1/authv1connect"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/chat/v1/chatv1connect"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/events/v1/eventsv1connect"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/message/messagev1connect"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/message/v1/messagev1connect"

"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/search/v1/searchv1connect"
"github.com/kavkaco/Kavka-Core/utils/hash"
"github.com/rs/cors"
Expand Down
19 changes: 7 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@ type Env int
const (
Development Env = iota
Production
Test
)

var (
CurrentEnv Env = Development
filename string
)
var CurrentEnv Env = Development

type (
Config struct {
Expand Down Expand Up @@ -108,25 +104,24 @@ func ConfigsDirPath() string {
}

func Read() *Config {
var fileName string

// Load KAVKA ENV
env := strings.ToLower(os.Getenv("KAVKA_ENV"))

if len(strings.TrimSpace(env)) == 0 || env == "development" {
CurrentEnv = Development
filename = "config.development.yml"
fileName = "config.development.yml"
} else if env == "production" {
CurrentEnv = Production
filename = "config.production.yml"
} else if env == "test" {
CurrentEnv = Test
filename = "config.test.yml"
fileName = "config.production.yml"
} else {
panic(errors.New("Invalid env value set for variable KAVKA_ENV: " + env))
log.Fatalln(errors.New("Invalid env value set for variable KAVKA_ENV: " + env))
}

// Load YAML configs
k := koanf.New(ConfigsDirPath())
if err := k.Load(file.Provider(fmt.Sprintf("%s/%s", ConfigsDirPath(), filename)), yaml.Parser()); err != nil {
if err := k.Load(file.Provider(fmt.Sprintf("%s/%s", ConfigsDirPath(), fileName)), yaml.Parser()); err != nil {
log.Fatalf("error loading config: %v", err)
}
config := &Config{}
Expand Down
39 changes: 0 additions & 39 deletions config/config.test.yml

This file was deleted.

20 changes: 0 additions & 20 deletions config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package config

import (
"errors"
"os"
"testing"

Expand All @@ -20,22 +19,3 @@ func TestDevelopmentConfig(t *testing.T) {
require.NotEmpty(t, configs)
require.Equal(t, CurrentEnv, Development)
}

func TestTestConfig(t *testing.T) {
os.Setenv("KAVKA_ENV", "test")

Read()
require.Equal(t, CurrentEnv, Test)
}

func TestFunctionPanics(t *testing.T) {
os.Setenv("KAVKA_ENV", "panic")

defer func() {
if r := recover(); r == nil {
require.Error(t, errors.New("Expected panic but did not panic"))
}
}()

Read()
}
17 changes: 8 additions & 9 deletions delivery/grpc/handlers/auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/kavkaco/Kavka-Core/internal/service/auth"
authv1 "github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/auth/v1"
"github.com/kavkaco/Kavka-Core/protobuf/proto_model_transformer"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/protobuf/types/known/durationpb"
)

Expand All @@ -24,7 +23,7 @@ func NewAuthGrpcHandler(authService auth.AuthService) AuthGrpcServer {
func (a AuthGrpcServer) Login(ctx context.Context, req *connect.Request[authv1.LoginRequest]) (*connect.Response[authv1.LoginResponse], error) {
user, accessToken, refreshToken, varror := a.authService.Login(ctx, req.Msg.Email, req.Msg.Password)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_INTERNAL))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodePermissionDenied)
}

res := connect.NewResponse(&authv1.LoginResponse{
Expand All @@ -39,7 +38,7 @@ func (a AuthGrpcServer) Login(ctx context.Context, req *connect.Request[authv1.L
func (a AuthGrpcServer) Register(ctx context.Context, req *connect.Request[authv1.RegisterRequest]) (*connect.Response[authv1.RegisterResponse], error) {
_, varror := a.authService.Register(ctx, req.Msg.Name, req.Msg.LastName, req.Msg.Username, req.Msg.Email, req.Msg.Password, req.Msg.VerifyEmailRedirectUrl)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_INVALID_ARGUMENT))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodeInvalidArgument)
}

res := connect.NewResponse(&authv1.RegisterResponse{})
Expand All @@ -50,7 +49,7 @@ func (a AuthGrpcServer) Register(ctx context.Context, req *connect.Request[authv
func (a AuthGrpcServer) Authenticate(ctx context.Context, req *connect.Request[authv1.AuthenticateRequest]) (*connect.Response[authv1.AuthenticateResponse], error) {
user, varror := a.authService.Authenticate(ctx, req.Msg.AccessToken)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_PERMISSION_DENIED))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodePermissionDenied)
}

res := connect.NewResponse(&authv1.AuthenticateResponse{
Expand All @@ -63,7 +62,7 @@ func (a AuthGrpcServer) Authenticate(ctx context.Context, req *connect.Request[a
func (a AuthGrpcServer) ChangePassword(ctx context.Context, req *connect.Request[authv1.ChangePasswordRequest]) (*connect.Response[authv1.ChangePasswordResponse], error) {
varror := a.authService.ChangePassword(ctx, req.Msg.AccessToken, req.Msg.OldPassword, req.Msg.NewPassword)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.CodeUnavailable)
return nil, grpc_helpers.GrpcVarror(varror, connect.CodePermissionDenied)
}

res := connect.NewResponse(&authv1.ChangePasswordResponse{})
Expand All @@ -73,7 +72,7 @@ func (a AuthGrpcServer) ChangePassword(ctx context.Context, req *connect.Request
func (a AuthGrpcServer) RefreshToken(ctx context.Context, req *connect.Request[authv1.RefreshTokenRequest]) (*connect.Response[authv1.RefreshTokenResponse], error) {
newAccessToken, varror := a.authService.RefreshToken(ctx, req.Msg.UserId, req.Msg.RefreshToken)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_UNAVAILABLE))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodePermissionDenied)
}

res := connect.NewResponse(&authv1.RefreshTokenResponse{
Expand All @@ -86,7 +85,7 @@ func (a AuthGrpcServer) RefreshToken(ctx context.Context, req *connect.Request[a
func (a AuthGrpcServer) SendResetPassword(ctx context.Context, req *connect.Request[authv1.SendResetPasswordRequest]) (*connect.Response[authv1.SendResetPasswordResponse], error) {
resetPasswordToken, timeout, varror := a.authService.SendResetPassword(ctx, req.Msg.Email, req.Msg.ResetPasswordRedirectUrl)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_UNAVAILABLE))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodePermissionDenied)
}

timeoutProto := durationpb.New(timeout)
Expand All @@ -101,7 +100,7 @@ func (a AuthGrpcServer) SendResetPassword(ctx context.Context, req *connect.Requ
func (a AuthGrpcServer) SubmitResetPassword(ctx context.Context, req *connect.Request[authv1.SubmitResetPasswordRequest]) (*connect.Response[authv1.SubmitResetPasswordResponse], error) {
varror := a.authService.SubmitResetPassword(ctx, req.Msg.ResetPasswordToken, req.Msg.NewPassword)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_UNAVAILABLE))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodePermissionDenied)
}

res := connect.NewResponse(&authv1.SubmitResetPasswordResponse{})
Expand All @@ -111,7 +110,7 @@ func (a AuthGrpcServer) SubmitResetPassword(ctx context.Context, req *connect.Re
func (a AuthGrpcServer) VerifyEmail(ctx context.Context, req *connect.Request[authv1.VerifyEmailRequest]) (*connect.Response[authv1.VerifyEmailResponse], error) {
varror := a.authService.VerifyEmail(ctx, req.Msg.VerifyEmailToken)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_UNAVAILABLE))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodePermissionDenied)
}

res := connect.NewResponse(&authv1.VerifyEmailResponse{})
Expand Down
9 changes: 4 additions & 5 deletions delivery/grpc/handlers/chat_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
chatv1 "github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/chat/v1"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/chat/v1/chatv1connect"
"github.com/kavkaco/Kavka-Core/protobuf/proto_model_transformer"
"google.golang.org/genproto/googleapis/rpc/code"
)

type chatHandler struct {
Expand All @@ -34,7 +33,7 @@ func (h chatHandler) CreateChannel(ctx context.Context, req *connect.Request[cha

chat, varror := h.chatService.CreateChannel(ctx, userID, req.Msg.Title, req.Msg.Username, req.Msg.Description)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_UNAVAILABLE))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodeUnavailable)
}

chatProto, err := proto_model_transformer.ChatToProto(*chat)
Expand Down Expand Up @@ -69,7 +68,7 @@ func (h chatHandler) CreateGroup(ctx context.Context, req *connect.Request[chatv

chat, varror := h.chatService.CreateGroup(ctx, userID, req.Msg.Title, req.Msg.Username, req.Msg.Description)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_INTERNAL))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodeUnavailable)
}

chatProto, err := proto_model_transformer.ChatToProto(*chat)
Expand Down Expand Up @@ -148,7 +147,7 @@ func (h chatHandler) JoinChat(ctx context.Context, req *connect.Request[chatv1.J

joinResult, varror := h.chatService.JoinChat(ctx, chatID, userID)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_INTERNAL))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodeInternal)
}

if !joinResult.Joined {
Expand All @@ -157,7 +156,7 @@ func (h chatHandler) JoinChat(ctx context.Context, req *connect.Request[chatv1.J

protoChat, err := proto_model_transformer.ChatToProto(*joinResult.UpdatedChat)
if err != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_INTERNAL))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodeInternal)
}

res := &connect.Response[chatv1.JoinChatResponse]{Msg: &chatv1.JoinChatResponse{
Expand Down
16 changes: 10 additions & 6 deletions delivery/grpc/handlers/events_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package grpc_handlers

import (
"context"
"time"

"connectrpc.com/connect"
"github.com/kavkaco/Kavka-Core/config"
"github.com/kavkaco/Kavka-Core/delivery/grpc/interceptor"
"github.com/kavkaco/Kavka-Core/infra/stream"
"github.com/kavkaco/Kavka-Core/internal/model"
Expand All @@ -21,24 +23,24 @@ func NewEventsGrpcHandler(logger *log.SubLogger, streamer stream.StreamSubscribe
return &eventsHandler{logger, streamer}
}

func (e *eventsHandler) SubscribeEventsStream(ctx context.Context, req *connect.Request[eventsv1.SubscribeEventsStreamRequest], str *connect.ServerStream[eventsv1.SubscribeEventsStreamResponse]) error {
func (e *eventsHandler) SubscribeEventsStream(ctx context.Context, req *connect.Request[eventsv1.SubscribeEventsStreamRequest], stream *connect.ServerStream[eventsv1.SubscribeEventsStreamResponse]) error {
userID := ctx.Value(interceptor.UserID{}).(model.UserID)

done := ctx.Done()
userCh := make(chan *eventsv1.SubscribeEventsStreamResponse)
e.streamer.UserSubscribe(userID, userCh)

e.logger.Debug("user stream established")
e.logger.Trace("user stream established")

for {
if str == nil {
if stream == nil {
e.logger.Error("user stream is closed")
return nil
}

select {
case <-done:
e.logger.Debug("user disconnected!")
e.logger.Trace("user disconnected!")
e.streamer.UserUnsubscribe(userID)
return nil
case event, ok := <-userCh:
Expand All @@ -47,9 +49,11 @@ func (e *eventsHandler) SubscribeEventsStream(ctx context.Context, req *connect.
continue
}

e.logger.Debug("events-handler", "event-name", event.Name)
if config.CurrentEnv == config.Development {
time.Sleep(500 * time.Millisecond)
}

err := str.Send(event)
err := stream.Send(event)
if err != nil {
log.Error("unable to send message with grpc: " + err.Error())
continue
Expand Down
9 changes: 5 additions & 4 deletions delivery/grpc/handlers/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ import (
"github.com/kavkaco/Kavka-Core/internal/model"
"github.com/kavkaco/Kavka-Core/internal/service/message"
"github.com/kavkaco/Kavka-Core/log"
messagev1 "github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/message"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/message/messagev1connect"

messagev1 "github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/message/v1"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/message/v1/messagev1connect"
"github.com/kavkaco/Kavka-Core/protobuf/proto_model_transformer"
"github.com/kavkaco/Kavka-Core/utils/vali"
"google.golang.org/genproto/googleapis/rpc/code"
)

type MessageGrpcServer struct {
logger *log.SubLogger
messageService message.MessageService
messageService *message.MessageService
}

func NewMessageGrpcHandler(logger *log.SubLogger, messageService message.MessageService) messagev1connect.MessageServiceHandler {
func NewMessageGrpcHandler(logger *log.SubLogger, messageService *message.MessageService) messagev1connect.MessageServiceHandler {
return MessageGrpcServer{logger, messageService}
}

Expand Down
3 changes: 1 addition & 2 deletions delivery/grpc/handlers/search_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
searchv1 "github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/search/v1"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/search/v1/searchv1connect"
"github.com/kavkaco/Kavka-Core/protobuf/proto_model_transformer"
"google.golang.org/genproto/googleapis/rpc/code"
)

type searchHandler struct {
Expand All @@ -25,7 +24,7 @@ func NewSearchGrpcHandler(logger *log.SubLogger, searchService search.SearchServ
func (s *searchHandler) Search(ctx context.Context, req *connect.Request[searchv1.SearchRequest]) (*connect.Response[searchv1.SearchResponse], error) {
result, varror := s.searchService.Search(ctx, req.Msg.Input)
if varror != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.Code(code.Code_INTERNAL))
return nil, grpc_helpers.GrpcVarror(varror, connect.CodeUnavailable)
}

chats, err := proto_model_transformer.ChatsToProto(result.Chats)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ func VarrorAsGrpcErrDetails(varror *vali.Varror) (*connect.ErrorDetail, error) {

func GrpcVarror(varror *vali.Varror, code connect.Code) error {
connectErr := connect.NewError(code, varror.Error)
varrorDetail, _ := VarrorAsGrpcErrDetails(varror)
varrorDetail, err := VarrorAsGrpcErrDetails(varror)
if err != nil {
return err
}

if len(varror.ValidationErrors) > 0 {
connectErr.AddDetail(varrorDetail)
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ services:
image: nats
ports:
- "4222:4222"
restart: always

networks:
default_network:
Expand Down
Loading

0 comments on commit 0d03f83

Please sign in to comment.