Skip to content

Commit

Permalink
refactor: direct chat creation business logic improved
Browse files Browse the repository at this point in the history
  • Loading branch information
tahadostifam committed Sep 23, 2024
1 parent 3f70f30 commit 92eacb4
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 50 deletions.
26 changes: 14 additions & 12 deletions database/repo_mongo/chat_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewChatMongoRepository(db *mongo.Database) repository.ChatRepository {
return &chatRepository{db.Collection(database.UsersCollection), db.Collection(database.ChatsCollection)}
}

func (repo *chatRepository) JoinChat(ctx context.Context, chatType string, userID string, chatID primitive.ObjectID) error {
func (repo *chatRepository) AddToUsersChatsList(ctx context.Context, userID string, chatID primitive.ObjectID) error {
userFilter := bson.M{"user_id": userID}
userUpdate := bson.M{
"$addToSet": bson.M{
Expand All @@ -33,18 +33,20 @@ func (repo *chatRepository) JoinChat(ctx context.Context, chatType string, userI
return err
}

if chatType != "direct" {
chatFilter := bson.M{"_id": chatID}
chatUpdate := bson.M{
"$addToSet": bson.M{
"chat_detail.members": userID,
},
}
return nil
}

_, err = repo.chatsCollection.UpdateOne(ctx, chatFilter, chatUpdate)
if err != nil {
return err
}
func (repo *chatRepository) JoinChat(ctx context.Context, chatType string, userID string, chatID primitive.ObjectID) error {
chatFilter := bson.M{"_id": chatID}
chatUpdate := bson.M{
"$addToSet": bson.M{
"chat_detail.members": userID,
},
}

_, err := repo.chatsCollection.UpdateOne(ctx, chatFilter, chatUpdate)
if err != nil {
return err
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion delivery/grpc/handlers/auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"connectrpc.com/connect"
grpc_helpers "github.com/kavkaco/Kavka-Core/delivery/grpc/helpers"
"github.com/kavkaco/Kavka-Core/delivery/grpc/proto_model_transformer"

"github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer"
"github.com/kavkaco/Kavka-Core/internal/service/auth"

authv1 "github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/auth/v1"
Expand Down
15 changes: 8 additions & 7 deletions delivery/grpc/handlers/chat_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
grpc_helpers "github.com/kavkaco/Kavka-Core/delivery/grpc/helpers"
"github.com/kavkaco/Kavka-Core/delivery/grpc/interceptor"

"github.com/kavkaco/Kavka-Core/delivery/grpc/proto_model_transformer"
"github.com/kavkaco/Kavka-Core/internal/model"
"github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer"
"github.com/kavkaco/Kavka-Core/internal/service"
"github.com/kavkaco/Kavka-Core/internal/service/chat"
"github.com/kavkaco/Kavka-Core/log"
chatv1 "github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/chat/v1"
Expand Down Expand Up @@ -38,7 +39,7 @@ func (h chatHandler) GetDirectChat(ctx context.Context, req *connect.Request[cha

chatProto, err := proto_model_transformer.ChatToProto(*chatDto)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling)
}

res := connect.NewResponse(&chatv1.GetDirectChatResponse{
Expand All @@ -61,7 +62,7 @@ func (h chatHandler) CreateChannel(ctx context.Context, req *connect.Request[cha

chatProto, err := proto_model_transformer.ChatToProto(*chat)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling)
}

res := connect.NewResponse(&chatv1.CreateChannelResponse{
Expand All @@ -88,7 +89,7 @@ func (h chatHandler) CreateDirect(ctx context.Context, req *connect.Request[chat

chatProto, err := proto_model_transformer.ChatToProto(*chat)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling)
}

res := connect.NewResponse(&chatv1.CreateDirectResponse{
Expand All @@ -111,7 +112,7 @@ func (h chatHandler) CreateGroup(ctx context.Context, req *connect.Request[chatv

chatProto, err := proto_model_transformer.ChatToProto(*chat)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling)
}

res := connect.NewResponse(&chatv1.CreateGroupResponse{
Expand Down Expand Up @@ -140,7 +141,7 @@ func (h chatHandler) GetChat(ctx context.Context, req *connect.Request[chatv1.Ge
chatGetter := model.NewChatDTO(chat)
chatProto, err := proto_model_transformer.ChatToProto(*chatGetter)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling)
}

res := &connect.Response[chatv1.GetChatResponse]{
Expand Down Expand Up @@ -199,7 +200,7 @@ func (h chatHandler) JoinChat(ctx context.Context, req *connect.Request[chatv1.J

protoChat, err := proto_model_transformer.ChatToProto(*joinResult.UpdatedChat)
if err != nil {
return nil, grpc_helpers.GrpcVarror(varror, connect.CodeInternal)
return nil, connect.NewError(connect.CodeInternal, service.ErrProtoMarshaling)
}

res := &connect.Response[chatv1.JoinChatResponse]{Msg: &chatv1.JoinChatResponse{
Expand Down
2 changes: 1 addition & 1 deletion delivery/grpc/handlers/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
grpc_helpers "github.com/kavkaco/Kavka-Core/delivery/grpc/helpers"
"github.com/kavkaco/Kavka-Core/delivery/grpc/interceptor"
"github.com/kavkaco/Kavka-Core/internal/model"
"github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer"
"github.com/kavkaco/Kavka-Core/internal/service/message"
"github.com/kavkaco/Kavka-Core/log"

"github.com/kavkaco/Kavka-Core/delivery/grpc/proto_model_transformer"
"github.com/kavkaco/Kavka-Core/utils/vali"
messagev1 "github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/message/v1"
"github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/message/v1/messagev1connect"
Expand Down
2 changes: 1 addition & 1 deletion delivery/grpc/handlers/search_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"connectrpc.com/connect"
grpc_helpers "github.com/kavkaco/Kavka-Core/delivery/grpc/helpers"
"github.com/kavkaco/Kavka-Core/delivery/grpc/proto_model_transformer"
"github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer"
"github.com/kavkaco/Kavka-Core/internal/service/search"
"github.com/kavkaco/Kavka-Core/log"
searchv1 "github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/search/v1"
Expand Down
4 changes: 2 additions & 2 deletions internal/repository/chat_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/kavkaco/Kavka-Core/internal/model"
"go.mongodb.org/mongo-driver/bson/primitive"
)

type ChatRepository interface {
Expand All @@ -14,5 +13,6 @@ type ChatRepository interface {
GetUserChats(ctx context.Context, userID model.UserID, chatIDs []model.ChatID) ([]model.ChatDTO, error)
GetDirectChat(ctx context.Context, userID model.UserID, recipientUserID model.UserID) (*model.Chat, error)
GetChatMembers(chatID model.ChatID) []model.Member
JoinChat(ctx context.Context, chatType string, userID string, chatID primitive.ObjectID) error
JoinChat(ctx context.Context, chatType string, userID string, chatID model.ChatID) error
AddToUsersChatsList(ctx context.Context, userID string, chatID model.ChatID) error
}
79 changes: 54 additions & 25 deletions internal/service/chat/chat_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ import (

"github.com/kavkaco/Kavka-Core/infra/stream"
"github.com/kavkaco/Kavka-Core/internal/model"
"github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer"
"github.com/kavkaco/Kavka-Core/internal/repository"
"github.com/kavkaco/Kavka-Core/internal/service"
"github.com/kavkaco/Kavka-Core/log"
"github.com/kavkaco/Kavka-Core/utils"
"github.com/kavkaco/Kavka-Core/utils/vali"
eventsv1 "github.com/kavkaco/Kavka-ProtoBuf/gen/go/protobuf/events/v1"
"google.golang.org/protobuf/proto"
)

const SubjChats = "chats"
Expand Down Expand Up @@ -133,7 +137,12 @@ func (s *ChatService) CreateDirect(ctx context.Context, userID model.UserID, rec
return nil, &vali.Varror{Error: ErrMessageStoreCreation}
}

err = s.chatRepo.JoinChat(ctx, createdChat.ChatType, userID, createdChat.ChatID)
err = s.chatRepo.AddToUsersChatsList(ctx, userID, createdChat.ChatID)
if err != nil {
return nil, &vali.Varror{Error: ErrUnableToAddChatToUsersList}
}

err = s.chatRepo.AddToUsersChatsList(ctx, recipientUserID, createdChat.ChatID)
if err != nil {
return nil, &vali.Varror{Error: ErrUnableToAddChatToUsersList}
}
Expand All @@ -156,6 +165,34 @@ func (s *ChatService) CreateDirect(ctx context.Context, userID model.UserID, rec
Recipient: recipient,
}

chatProto, err := proto_model_transformer.ChatToProto(*chatDTO)
if err != nil {
return nil, &vali.Varror{Error: service.ErrProtoMarshaling}
}

// Let's tell the recipient that this user created a direct chat with you
payloadProtoBuf, marshalErr := proto.Marshal(&eventsv1.SubscribeEventsStreamResponse{
Name: "add-chat",
Type: eventsv1.SubscribeEventsStreamResponse_TYPE_ADD_CHAT,
Payload: &eventsv1.SubscribeEventsStreamResponse_AddChat{
AddChat: &eventsv1.AddChat{
Chat: chatProto,
},
},
},
)
if marshalErr != nil {
return nil, &vali.Varror{Error: service.ErrProtoMarshaling}
}

s.eventPublisher.Publish(&eventsv1.StreamEvent{

Check failure on line 188 in internal/service/chat/chat_service.go

View workflow job for this annotation

GitHub Actions / linting

Error return value of `s.eventPublisher.Publish` is not checked (errcheck)
SenderUserId: userID,
ReceiversUserId: []model.UserID{
finalRecipientUserID,
},
Payload: payloadProtoBuf,
})

return chatDTO, nil
}

Expand Down Expand Up @@ -183,19 +220,15 @@ func (s *ChatService) CreateGroup(ctx context.Context, userID model.UserID, titl
Text: "Group created",
}, userID)

go func() {
createErr := s.messageRepo.Create(context.TODO(), savedChat.ChatID)
if createErr != nil {
s.logger.Error("message store creation failed: " + createErr.Error())
return
}
err = s.messageRepo.Create(context.TODO(), savedChat.ChatID)
if err != nil {
return nil, &vali.Varror{Error: ErrJoinDirectChat}
}

_, createErr = s.messageRepo.Insert(context.TODO(), savedChat.ChatID, messageModel)
if createErr != nil {
s.logger.Error("failed to insert message in group creation: " + createErr.Error())
return
}
}()
_, err = s.messageRepo.Insert(context.TODO(), savedChat.ChatID, messageModel)
if err != nil {
return nil, &vali.Varror{Error: ErrJoinDirectChat}
}

err = s.chatRepo.JoinChat(ctx, savedChat.ChatType, userID, savedChat.ChatID)
if err != nil {
Expand Down Expand Up @@ -232,19 +265,15 @@ func (s *ChatService) CreateChannel(ctx context.Context, userID model.UserID, ti
Text: "Channel created",
}, userID)

go func() {
createError := s.messageRepo.Create(context.TODO(), savedChat.ChatID)
if createError != nil {
s.logger.Error("message store creation failed: " + createError.Error())
return
}
err = s.messageRepo.Create(context.TODO(), savedChat.ChatID)
if err != nil {
return nil, &vali.Varror{Error: ErrMessageStoreCreation}
}

_, createError = s.messageRepo.Insert(context.TODO(), savedChat.ChatID, messageModel)
if createError != nil {
s.logger.Error("failed to insert message in channel creation: " + createError.Error())
return
}
}()
_, err = s.messageRepo.Insert(context.TODO(), savedChat.ChatID, messageModel)
if err != nil {
return nil, &vali.Varror{Error: ErrMessageStoreCreation}
}

err = s.chatRepo.JoinChat(ctx, savedChat.ChatType, userID, savedChat.ChatID)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions internal/service/common_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package service

import "errors"

var (
ErrProtoMarshaling = errors.New("proto marshaling failed")
)
2 changes: 1 addition & 1 deletion internal/service/message/message_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package message
import (
"context"

"github.com/kavkaco/Kavka-Core/delivery/grpc/proto_model_transformer"
"github.com/kavkaco/Kavka-Core/infra/stream"
"github.com/kavkaco/Kavka-Core/internal/model"
"github.com/kavkaco/Kavka-Core/internal/model/proto_model_transformer"
"github.com/kavkaco/Kavka-Core/internal/repository"
"github.com/kavkaco/Kavka-Core/log"
"github.com/kavkaco/Kavka-Core/utils/vali"
Expand Down

0 comments on commit 92eacb4

Please sign in to comment.