Skip to content

Commit

Permalink
Complete handlers and realtime communication (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
tahadostifam authored Aug 10, 2024
1 parent f794aed commit a0fde7a
Show file tree
Hide file tree
Showing 59 changed files with 4,091 additions and 544 deletions.
27 changes: 22 additions & 5 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ import (
"github.com/kavkaco/Kavka-Core/infra/stream"
"github.com/kavkaco/Kavka-Core/internal/service/auth"
"github.com/kavkaco/Kavka-Core/internal/service/chat"
"github.com/kavkaco/Kavka-Core/internal/service/message"
"github.com/kavkaco/Kavka-Core/internal/service/search"
"github.com/kavkaco/Kavka-Core/log"
"github.com/kavkaco/Kavka-Core/pkg/email"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/auth/v1/authv1connect"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/chat/v1/chatv1connect"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/events/v1/eventsv1connect"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/message/messagev1connect"
"github.com/kavkaco/Kavka-Core/protobuf/gen/go/protobuf/search/v1/searchv1connect"
"github.com/kavkaco/Kavka-Core/utils/hash"
"github.com/rs/cors"
auth_manager "github.com/tahadostifam/go-auth-manager"
Expand Down Expand Up @@ -80,14 +84,16 @@ func main() {
streamSubscriber, err := stream.NewStreamSubscriber(natsClient, log.NewSubLogger("stream-subscriber"))
handleError(err)

// [=== Init Internal Services ===]
// [=== Init Internal Services & Repositories ===]
hashManager := hash.NewHashManager(hash.DefaultHashParams)

userRepo := repository_mongo.NewUserMongoRepository(mongoDB)
// userService := user.NewUserService(userRepo)

authRepo := repository_mongo.NewAuthMongoRepository(mongoDB)

searchRepo := repository_mongo.NewSearchRepository(mongoDB)

var emailService email.EmailService
if config.CurrentEnv == config.Production {
emailService = email.NewEmailService(&cfg.Email, "email/templates")
Expand All @@ -97,13 +103,16 @@ func main() {

authService := auth.NewAuthService(authRepo, userRepo, authManager, hashManager, emailService)

messageRepo := repository_mongo.NewMessageMongoRepository(mongoDB)

chatRepo := repository_mongo.NewChatMongoRepository(mongoDB)
chatService := chat.NewChatService(log.NewSubLogger("chat-service"), chatRepo, userRepo, streamPublisher)
chatService := chat.NewChatService(log.NewSubLogger("chat-service"), chatRepo, userRepo, messageRepo, streamPublisher)

messageService := message.NewMessageService(log.NewSubLogger("message-service"), messageRepo, chatRepo, userRepo, streamPublisher)

// messageRepo := repository.NewMessageRepository(mongoDB)
// messageService := message.NewMessageService(messageRepo, chatRepo)
searchService := search.NewSearchService(log.NewSubLogger("search-service"), searchRepo)

// [=== Init grpc server ===]
// [=== Init Grpc Server ===]
grpcListenAddr := fmt.Sprintf("%s:%d", cfg.HTTP.Host, cfg.HTTP.Port)
gRPCRouter := http.NewServeMux()

Expand All @@ -119,9 +128,17 @@ func main() {
eventsGrpcHandler := grpc_handlers.NewEventsGrpcHandler(log.NewSubLogger("events-handler"), streamSubscriber)
eventsGrpcRoute, eventsGrpcRouter := eventsv1connect.NewEventsServiceHandler(eventsGrpcHandler, interceptors)

messageGrpcHandler := grpc_handlers.NewMessageGrpcHandler(log.NewSubLogger("message-handler"), messageService)
messageGrpcRoute, messageGrpcRouter := messagev1connect.NewMessageServiceHandler(messageGrpcHandler, interceptors)

searchGrpcHandler := grpc_handlers.NewSearchGrpcHandler(log.NewSubLogger("message-handler"), searchService)
searchGrpcRoute, searchGrpcRouter := searchv1connect.NewSearchServiceHandler(searchGrpcHandler, interceptors)

gRPCRouter.Handle(authGrpcRoute, authGrpcRouter)
gRPCRouter.Handle(chatGrpcRoute, chatGrpcRouter)
gRPCRouter.Handle(eventsGrpcRoute, eventsGrpcRouter)
gRPCRouter.Handle(messageGrpcRoute, messageGrpcRouter)
gRPCRouter.Handle(searchGrpcRoute, searchGrpcRouter)

// [=== PPROF Memory Profiling Tool ===]
if config.CurrentEnv == config.Development {
Expand Down
53 changes: 46 additions & 7 deletions database/mongo_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,54 @@ func GetMongoDBInstance(uri, dbName string) (*mongo.Database, error) {
}

func ConfigureCollections(db *mongo.Database) {
db.Collection(UsersCollection).Indexes().CreateOne(context.Background(), mongo.IndexModel{ //nolint
Keys: bson.D{{Key: "email", Value: 1}},
Options: options.Index().SetUnique(true),
})
handleError := func(err error) {
if err != nil {
panic(err)
}
}

db.Collection(UsersCollection).Indexes().CreateOne(context.Background(), mongo.IndexModel{ //nolint
Keys: bson.D{{Key: "username", Value: 1}},
Options: options.Index().SetUnique(true),
// Users indexes

_, err := db.Collection(UsersCollection).Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.M{"email": 1},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.M{"username": 1},
Options: options.Index().SetUnique(true),
},
})
handleError(err)

_, err = db.Collection(UsersCollection).Indexes().CreateMany(context.Background(), []mongo.IndexModel{ //nolint
{
Keys: bson.D{
{Key: "name", Value: "text"},
{Key: "email", Value: "text"},
{Key: "username", Value: "text"},
{Key: "last_name", Value: "text"},
},
},
})
handleError(err)

// Chats indexes

_, err = db.Collection(ChatsCollection).Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{{Key: "chat_detail.username", Value: 1}},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{
{Key: "chat_detail.title", Value: "text"},
{Key: "chat_detail.username", Value: "text"},
},
Options: options.Index(),
},
})
handleError(err)
}

func IsDuplicateKeyError(err error) bool {
Expand Down
60 changes: 34 additions & 26 deletions database/repo_mongo/chat_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,6 @@ func NewChatMongoRepository(db *mongo.Database) repository.ChatRepository {
return &chatRepository{db.Collection(database.UsersCollection), db.Collection(database.ChatsCollection)}
}

func (repo *chatRepository) UpdateChatLastMessage(ctx context.Context, chatID model.ChatID, lastMessage model.LastMessage) error {
filter := bson.M{"_id": chatID}
update := bson.M{"$set": bson.M{"last_message": lastMessage}}

result, err := repo.chatsCollection.UpdateOne(ctx, filter, update)
if err != nil {
return err
}

if result.MatchedCount == 0 || result.ModifiedCount == 0 {
return repository.ErrNotModified
}

return nil
}

func (repo *chatRepository) AddToUsersChatsList(ctx context.Context, userID string, chatID primitive.ObjectID) error {
filter := bson.M{"user_id": userID}
update := bson.M{
Expand All @@ -51,12 +35,7 @@ func (repo *chatRepository) AddToUsersChatsList(ctx context.Context, userID stri
return nil
}

// TODO
func (repo *chatRepository) SearchInChats(ctx context.Context, key string) ([]model.Chat, error) {
panic("unimplemented")
}

// TODO
// FIXME
func (repo *chatRepository) GetChatMembers(chatID model.ChatID) []model.Member {
return []model.Member{}
}
Expand Down Expand Up @@ -88,15 +67,44 @@ func (repo *chatRepository) Destroy(ctx context.Context, chatID model.ChatID) er
return nil
}

func (repo *chatRepository) FindManyByChatID(ctx context.Context, chatIDs []model.ChatID) ([]model.Chat, error) {
filter := bson.M{"_id": bson.M{"$in": chatIDs}}
func (repo *chatRepository) GetUserChats(ctx context.Context, chatIDs []model.ChatID) ([]model.ChatGetter, error) {
pipeline := bson.A{
bson.M{
"$match": bson.M{
"_id": bson.M{"$in": chatIDs},
},
},
bson.M{
"$lookup": bson.M{
"from": "messages",
"localField": "_id",
"foreignField": "chat_id",
"as": "chat_messages",
},
},
bson.M{
"$unwind": "$chat_messages",
},
bson.M{
"$addFields": bson.M{
"last_message": bson.M{
"$last": "$chat_messages.messages",
},
},
},
bson.M{
"$project": bson.M{
"chat_messages": 0,
},
},
}

cursor, err := repo.chatsCollection.Find(ctx, filter)
cursor, err := repo.chatsCollection.Aggregate(ctx, pipeline)
if err != nil {
return nil, err
}

var chats []model.Chat
var chats []model.ChatGetter

decodeErr := cursor.All(ctx, &chats)
if decodeErr != nil {
Expand Down
76 changes: 56 additions & 20 deletions database/repo_mongo/message_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package repository_mongo

import (
"context"
"errors"

"github.com/kavkaco/Kavka-Core/database"
"github.com/kavkaco/Kavka-Core/internal/repository"
Expand All @@ -19,29 +20,29 @@ func NewMessageMongoRepository(db *mongo.Database) repository.MessageRepository
return &messageRepository{db.Collection(database.MessagesCollection)}
}

func (repo *messageRepository) FindMessage(ctx context.Context, chatID model.ChatID, messageID model.MessageID) (*model.Message, error) {
func (repo *messageRepository) FindMessage(ctx context.Context, chatID model.ChatID, messageID model.MessageID) (*model.MessageGetter, error) {
filter := bson.M{"chat_id": chatID}

result := repo.messagesCollection.FindOne(ctx, filter)
if result.Err() != nil {
return nil, result.Err()
}

var message model.Message
var messageStore *model.MessageStore
var message model.MessageGetter
var chatMessages *model.ChatMessages

err := result.Decode(&messageStore)
err := result.Decode(&chatMessages)
if err != nil {
return nil, err
}

for i, m := range messageStore.Messages {
if m.MessageID == messageID {
message = m
for i, m := range chatMessages.Messages {
if m.Message.MessageID == messageID {
message = *m
break
}

if i == len(messageStore.Messages)-1 {
if i == len(chatMessages.Messages)-1 {
return nil, repository.ErrNotFound
}
}
Expand All @@ -50,9 +51,9 @@ func (repo *messageRepository) FindMessage(ctx context.Context, chatID model.Cha
}

func (repo *messageRepository) Create(ctx context.Context, chatID model.ChatID) error {
messageStoreModel := model.MessageStore{
messageStoreModel := model.ChatMessages{
ChatID: chatID,
Messages: []model.Message{},
Messages: []*model.MessageGetter{},
}
_, err := repo.messagesCollection.InsertOne(ctx, messageStoreModel)
if err != nil {
Expand All @@ -62,24 +63,59 @@ func (repo *messageRepository) Create(ctx context.Context, chatID model.ChatID)
return nil
}

func (repo *messageRepository) FetchMessages(ctx context.Context, chatID model.ChatID) ([]model.Message, error) {
filter := bson.M{"chat_id": chatID}
result := repo.messagesCollection.FindOne(ctx, filter)
if result.Err() != nil {
if database.IsRowExistsError(result.Err()) {
return nil, repository.ErrNotFound
func (repo *messageRepository) FetchMessages(ctx context.Context, chatID model.ChatID) (*model.ChatMessages, error) {
pipeline := bson.A{
bson.M{
"$match": bson.M{
"chat_id": chatID,
},
},
bson.M{"$unwind": bson.M{"path": "$messages"}},
bson.M{
"$lookup": bson.M{
"from": "users",
"localField": "messages.sender_id",
"foreignField": "user_id",
"as": "sender",
},
},
bson.M{"$unwind": bson.M{"path": "$sender"}},
bson.M{
"$group": bson.M{
"_id": "$_id",
"chat_id": bson.M{
"$first": "$chat_id",
},
"messages": bson.M{
"$push": bson.M{
"message": "$messages",
"sender": "$sender",
},
},
},
},
}

cursor, err := repo.messagesCollection.Aggregate(ctx, pipeline)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return &model.ChatMessages{}, nil
}

return nil, result.Err()
return nil, err
}

var messageStore *model.MessageStore
err := result.Decode(&messageStore)
var chatMessages []model.ChatMessages
err = cursor.All(ctx, &chatMessages)
if err != nil {
return nil, err
}

return messageStore.Messages, nil
if len(chatMessages) > 0 {
return &chatMessages[0], nil
}

return &model.ChatMessages{}, nil
}

func (repo *messageRepository) Insert(ctx context.Context, chatID model.ChatID, message *model.Message) (*model.Message, error) {
Expand Down
Loading

0 comments on commit a0fde7a

Please sign in to comment.