diff --git a/Makefile b/Makefile index 3a8aa91..2b8ccad 100644 --- a/Makefile +++ b/Makefile @@ -76,7 +76,7 @@ docker: .PHONY: run # run run: - docker-compose -f docker-compose.dev.yml build --no-cache && docker-compose -f docker-compose.dev.yml up --force-recreate -d + docker-compose -f docker-compose.dev.yml build && docker-compose -f docker-compose.dev.yml up --force-recreate -d .PHONY: stop # stop diff --git a/api/pbuf-registry/v1/entities.proto b/api/pbuf-registry/v1/entities.proto index a895817..31d1bfb 100644 --- a/api/pbuf-registry/v1/entities.proto +++ b/api/pbuf-registry/v1/entities.proto @@ -17,6 +17,9 @@ message Module { // The draft tags of the module. repeated string draft_tags = 4; + + // Packages that uses in this module + repeated string packages = 5; } // ProtoFile is a proto file registered in the registry. diff --git a/cmd/main.go b/cmd/main.go index cf44c89..8be2600 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -29,9 +29,9 @@ type Launcher struct { mainApp *kratos.App debugApp *kratos.App - compactionDaemon background.CompactionDaemon + compactionDaemon background.Daemon + protoParsingDaemon background.Daemon } - func main() { config.NewLoader().MustLoad() @@ -46,7 +46,8 @@ func main() { defer pool.Close() registryRepository := data.NewRegistryRepository(pool, logger) - registryServer := server.NewRegistryServer(registryRepository, logger) + metadataRepository := data.NewMetadataRepository(pool, logger) + registryServer := server.NewRegistryServer(registryRepository, metadataRepository, logger) app := kratos.New( kratos.ID(id), @@ -78,12 +79,12 @@ func main() { mainApp: app, debugApp: debugApp, - compactionDaemon: background.NewCompactionDaemon(registryRepository, logger), + compactionDaemon: background.NewCompactionDaemon(registryRepository, logger), + protoParsingDaemon: background.NewProtoParsingDaemon(metadataRepository, logger), } err = CreateRootCommand(launcher).Execute() if err != nil { - logHelper.Errorf("failed to execute command: %v", err) - return + logHelper.Errorf("failed to run application: %v", err) } } diff --git a/cmd/root.go b/cmd/root.go index 689efba..f552973 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -4,7 +4,9 @@ import ( "time" "github.com/go-co-op/gocron" + "github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2/log" + "github.com/pbufio/pbuf-registry/internal/background" "github.com/spf13/cobra" ) @@ -23,6 +25,7 @@ func CreateRootCommand(launcher *Launcher) *cobra.Command { } rootCommand.AddCommand(CreateCompactionDaemon(launcher)) + rootCommand.AddCommand(CreateProtoParsingDaemon(launcher)) return rootCommand } @@ -32,33 +35,57 @@ func CreateCompactionDaemon(launcher *Launcher) *cobra.Command { Use: "compaction", Short: "Run compaction daemon", Run: func(cmd *cobra.Command, args []string) { - s := gocron.NewScheduler(time.UTC) + runBackgroundDaemon( + launcher.config.Daemons.Compaction.CronSchedule, + launcher.compactionDaemon, + launcher.debugApp, + ) + }, + } - // start every hour - _, err := s.Cron(launcher.config.Daemons.Compaction.CronSchedule).Do(func() { - err := launcher.compactionDaemon.Run() - if err != nil { - log.Fatalf("failed to run compaction daemon: %v", err) - } - }) + return compactionDaemonCommand +} - if err != nil { - log.Fatalf("failed to create cron job: %v", err) - } +func CreateProtoParsingDaemon(launcher *Launcher) *cobra.Command { + protoParsingDaemonCommand := &cobra.Command{ + Use: "proto-parsing", + Short: "Run proto parsing daemon", + Run: func(cmd *cobra.Command, args []string) { + runBackgroundDaemon( + launcher.config.Daemons.ProtoParsing.CronSchedule, + launcher.protoParsingDaemon, + launcher.debugApp, + ) + }, + } - // start the scheduler - s.StartAsync() + return protoParsingDaemonCommand +} - err = launcher.debugApp.Run() - if err != nil { - log.Fatalf("failed to run debug app: %v", err) - } +func runBackgroundDaemon(cronSchedule string, daemon background.Daemon, debugApp *kratos.App) { + s := gocron.NewScheduler(time.UTC) - s.Stop() + // start every hour + _, err := s.Cron(cronSchedule).Do(func() { + err := daemon.Run() + if err != nil { + log.Fatalf("failed to run %s daemon: %v", daemon.Name(), err) + } + }) - log.Infof("Compaction daemon stopped") - }, + if err != nil { + log.Fatalf("failed to create cron job: %v", err) } - return compactionDaemonCommand + // start the scheduler + s.StartAsync() + + err = debugApp.Run() + if err != nil { + log.Fatalf("failed to run debug app: %v", err) + } + + s.Stop() + + log.Infof("%s daemon stopped", daemon.Name()) } diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 1dd5b8b..262f233 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -40,8 +40,6 @@ services: depends_on: - db - pbuf-registry - ports: - - "8083:8082" healthcheck: test: wget -O - http://localhost:8082/healthz || exit 1 interval: 5s @@ -50,4 +48,20 @@ services: environment: DATA_DATABASE_DSN: "postgres://pbuf:pbuf@db:5432/pbuf_registry?sslmode=disable" command: > - sh -c "/app/pbuf-registry compaction" \ No newline at end of file + sh -c "/app/pbuf-registry compaction" + pbuf-registry-protoparsing: + build: + context: . + restart: always + depends_on: + - db + - pbuf-registry + healthcheck: + test: wget -O - http://localhost:8082/healthz || exit 1 + interval: 5s + timeout: 10s + retries: 5 + environment: + DATA_DATABASE_DSN: "postgres://pbuf:pbuf@db:5432/pbuf_registry?sslmode=disable" + command: > + sh -c "/app/pbuf-registry proto-parsing" \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 244a162..6627218 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,7 @@ services: timeout: 10s retries: 5 pbuf-registry: - image: ghcr.io/pbufio/registry:v0.4.0-wip.1 + image: ghcr.io/pbufio/registry:v0.4.0-wip.2 restart: always depends_on: - db @@ -49,7 +49,7 @@ services: command: > sh -c "/app/pbuf-migrations && /app/pbuf-registry" pbuf-registry-compaction: - image: ghcr.io/pbufio/registry:v0.4.0-wip.1 + image: ghcr.io/pbufio/registry:v0.4.0-wip.2 restart: always depends_on: - db @@ -65,6 +65,23 @@ services: DATA_DATABASE_DSN: "postgres://pbuf:pbuf@db:5432/pbuf_registry?sslmode=disable" command: > sh -c "/app/pbuf-registry compaction" + pbuf-registry-protoparser: + image: ghcr.io/pbufio/registry:v0.4.0-wip.2 + restart: always + depends_on: + - db + - pbuf-registry + ports: + - "127.0.0.1:8084:8082" + healthcheck: + test: wget -O - http://localhost:8082/healthz || exit 1 + interval: 5s + timeout: 10s + retries: 5 + environment: + DATA_DATABASE_DSN: "postgres://pbuf:pbuf@db:5432/pbuf_registry?sslmode=disable" + command: > + sh -c "/app/pbuf-registry proto-parsing" networks: internal: \ No newline at end of file diff --git a/gen/pbuf-registry/v1/entities.pb.go b/gen/pbuf-registry/v1/entities.pb.go index 4cfec12..7edde1c 100644 --- a/gen/pbuf-registry/v1/entities.pb.go +++ b/gen/pbuf-registry/v1/entities.pb.go @@ -34,6 +34,8 @@ type Module struct { Tags []string `protobuf:"bytes,3,rep,name=tags,proto3" json:"tags,omitempty"` // The draft tags of the module. DraftTags []string `protobuf:"bytes,4,rep,name=draft_tags,json=draftTags,proto3" json:"draft_tags,omitempty"` + // Packages that uses in this module + Packages []string `protobuf:"bytes,5,rep,name=packages,proto3" json:"packages,omitempty"` } func (x *Module) Reset() { @@ -96,6 +98,13 @@ func (x *Module) GetDraftTags() []string { return nil } +func (x *Module) GetPackages() []string { + if x != nil { + return x.Packages + } + return nil +} + // ProtoFile is a proto file registered in the registry. type ProtoFile struct { state protoimpl.MessageState @@ -218,22 +227,24 @@ var file_pbuf_registry_v1_entities_proto_rawDesc = []byte{ 0x0a, 0x1f, 0x70, 0x62, 0x75, 0x66, 0x2d, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x2f, 0x76, 0x31, 0x2f, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f, 0x70, 0x62, 0x75, 0x66, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x2e, - 0x76, 0x31, 0x22, 0x5f, 0x0a, 0x06, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x12, 0x0e, 0x0a, 0x02, + 0x76, 0x31, 0x22, 0x7b, 0x0a, 0x06, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x12, 0x1d, 0x0a, 0x0a, 0x64, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x74, 0x61, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x64, 0x72, 0x61, 0x66, 0x74, 0x54, - 0x61, 0x67, 0x73, 0x22, 0x41, 0x0a, 0x09, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x46, 0x69, 0x6c, 0x65, - 0x12, 0x1a, 0x0a, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, - 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, - 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x22, 0x32, 0x0a, 0x0a, 0x44, 0x65, 0x70, 0x65, 0x6e, 0x64, - 0x65, 0x6e, 0x63, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x61, 0x67, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x74, 0x61, 0x67, 0x42, 0x18, 0x5a, 0x16, 0x70, 0x62, - 0x75, 0x66, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, - 0x31, 0x3b, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x63, 0x6b, 0x61, 0x67, 0x65, 0x73, 0x18, + 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x63, 0x6b, 0x61, 0x67, 0x65, 0x73, 0x22, + 0x41, 0x0a, 0x09, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x1a, 0x0a, 0x08, + 0x66, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x66, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, + 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, + 0x6e, 0x74, 0x22, 0x32, 0x0a, 0x0a, 0x44, 0x65, 0x70, 0x65, 0x6e, 0x64, 0x65, 0x6e, 0x63, 0x79, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x61, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x74, 0x61, 0x67, 0x42, 0x18, 0x5a, 0x16, 0x70, 0x62, 0x75, 0x66, 0x72, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x3b, 0x76, 0x31, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/gen/pbuf-registry/v1/registry.swagger.json b/gen/pbuf-registry/v1/registry.swagger.json index 0dad71c..4a4c686 100644 --- a/gen/pbuf-registry/v1/registry.swagger.json +++ b/gen/pbuf-registry/v1/registry.swagger.json @@ -468,6 +468,13 @@ "type": "string" }, "description": "The draft tags of the module." + }, + "packages": { + "type": "array", + "items": { + "type": "string" + }, + "title": "Packages that uses in this module" } }, "description": "Module is a module registered in the registry." diff --git a/internal/background/background.go b/internal/background/background.go new file mode 100644 index 0000000..b79a3bf --- /dev/null +++ b/internal/background/background.go @@ -0,0 +1,6 @@ +package background + +type Daemon interface { + Name() string + Run() error +} diff --git a/internal/background/compaction.go b/internal/background/compaction.go index 36e30e7..6c2b471 100644 --- a/internal/background/compaction.go +++ b/internal/background/compaction.go @@ -7,22 +7,26 @@ import ( "github.com/pbufio/pbuf-registry/internal/data" ) -type CompactionDaemon interface { - Run() error -} +const ( + compactionDaemonName = "compaction" +) type compactionDaemon struct { registryRepository data.RegistryRepository log *log.Helper } -func NewCompactionDaemon(registryRepository data.RegistryRepository, logger log.Logger) CompactionDaemon { +func NewCompactionDaemon(registryRepository data.RegistryRepository, logger log.Logger) Daemon { return &compactionDaemon{ registryRepository: registryRepository, log: log.NewHelper(log.With(logger, "module", "background/CompactionDaemon")), } } +func (d *compactionDaemon) Name() string { + return compactionDaemonName +} + func (d *compactionDaemon) Run() error { d.log.Infof("Running compaction") diff --git a/internal/background/protoparsing.go b/internal/background/protoparsing.go new file mode 100644 index 0000000..e9511d5 --- /dev/null +++ b/internal/background/protoparsing.go @@ -0,0 +1,68 @@ +package background + +import ( + "context" + + "github.com/go-kratos/kratos/v2/log" + "github.com/pbufio/pbuf-registry/internal/data" + "github.com/pbufio/pbuf-registry/internal/utils" +) + +const ( + protoParsingDaemonName = "proto parsing" +) + +type protoParsingDaemon struct { + metadataRepository data.MetadataRepository + log *log.Helper +} + +func NewProtoParsingDaemon(metadataRepository data.MetadataRepository, logger log.Logger) Daemon { + return &protoParsingDaemon{ + metadataRepository: metadataRepository, + log: log.NewHelper(log.With(logger, "module", "background/ProtoParsingDaemon")), + } +} + +func (p protoParsingDaemon) Name() string { + return protoParsingDaemonName +} + +func (p protoParsingDaemon) Run() error { + p.log.Infof("Running proto parsing") + + ctx := context.Background() + + // fetch tags that has not been processed yet + tagIds, err := p.metadataRepository.GetUnprocessedTagIds(ctx) + if err != nil { + p.log.Errorf("GetUnprocessedTags error: %v", err) + return err + } + + // iterate over tags and parse proto files + for _, tagId := range tagIds { + // get all proto files for tag + protofiles, err := p.metadataRepository.GetProtoFilesForTagId(ctx, tagId) + if err != nil { + p.log.Errorf("GetProtoFilesForTagId error: %v", err) + continue + } + // parse proto files + parsedProtoFiles, err := utils.ParseProtoFilesContents(protofiles) + if err != nil { + p.log.Errorf("ParseProtoFilesContents error: %v", err) + return err + } + + // save parsed proto files to database + err = p.metadataRepository.SaveParsedProtoFiles(ctx, tagId, parsedProtoFiles) + if err != nil { + p.log.Errorf("SaveParsedProtoFiles error: %v", err) + return err + } + } + + p.log.Infof("Proto parsing finished") + return nil +} diff --git a/internal/config/config.go b/internal/config/config.go index b198879..386541d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -51,8 +51,11 @@ type Config struct { Daemons struct { Compaction struct { CronSchedule string `mapstructure:"cron"` - } - } + } `mapstructure:"compaction"` + ProtoParsing struct { + CronSchedule string `mapstructure:"cron"` + } `mapstructure:"protoparsing"` + } `mapstructure:"daemons"` } // Cfg is the global config diff --git a/internal/config/config.yaml b/internal/config/config.yaml index 277f73e..5053868 100644 --- a/internal/config/config.yaml +++ b/internal/config/config.yaml @@ -25,4 +25,6 @@ data: daemons: compaction: - cron: "0 * * * *" \ No newline at end of file + cron: "0 * * * *" + protoparsing: + cron: "*/1 * * * *" diff --git a/internal/data/common_test.go b/internal/data/common_test.go index 8312889..3a33a0f 100644 --- a/internal/data/common_test.go +++ b/internal/data/common_test.go @@ -21,6 +21,7 @@ var suite TestSuite type TestSuite struct { psqlContainer *test_utils.PostgreSQLContainer registryRepository RegistryRepository + metadataRepository MetadataRepository } func (s *TestSuite) SetupSuite() { @@ -51,6 +52,7 @@ func (s *TestSuite) SetupSuite() { migrations.Migrate(db) s.registryRepository = NewRegistryRepository(pool, log.DefaultLogger) + s.metadataRepository = NewMetadataRepository(pool, log.DefaultLogger) } func (s *TestSuite) TearDownSuite() { diff --git a/internal/data/registry_test.go b/internal/data/data_test.go similarity index 74% rename from internal/data/registry_test.go rename to internal/data/data_test.go index dca24a9..df49868 100644 --- a/internal/data/registry_test.go +++ b/internal/data/data_test.go @@ -6,6 +6,8 @@ import ( "testing" v1 "github.com/pbufio/pbuf-registry/gen/pbuf-registry/v1" + "github.com/pbufio/pbuf-registry/internal/model" + "github.com/pbufio/pbuf-registry/internal/utils" ) const ( @@ -15,7 +17,7 @@ const ( var protofiles = []*v1.ProtoFile{ { Filename: "hello/test.proto", - Content: "syntax = \"proto3\";", + Content: "syntax = \"proto3\"; package hello; message Hello {}", }, } @@ -712,6 +714,14 @@ func Test_registryRepository_DeleteModule(t *testing.T) { }, wantErr: false, }, + { + name: "Delete module 2", + args: args{ + ctx: context.Background(), + name: "pbuf.io/pbuf-registry-2", + }, + wantErr: false, + }, { name: "Delete module not found", args: args{ @@ -731,3 +741,228 @@ func Test_registryRepository_DeleteModule(t *testing.T) { }) } } + +func Test_metadataRepo_GetUnprocessedTagIds(t *testing.T) { + type args struct { + moduleName string + tagIds []string + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "get unprocessed tag ids", + args: args{ + moduleName: "test-module", + tagIds: []string{ + "test-tag-1", + "test-tag-2", + "test-tag-3", + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := suite.metadataRepository + + err := suite.registryRepository.RegisterModule(context.Background(), tt.args.moduleName) + if err != nil { + t.Errorf("error registering module: %v", err) + return + } + + for _, tagId := range tt.args.tagIds { + _, err := suite.registryRepository.PushModule( + context.Background(), + tt.args.moduleName, + tagId, + protofiles) + if err != nil { + t.Errorf("error pushing module: %v", err) + return + } + } + + got, err := m.GetUnprocessedTagIds(context.Background()) + if (err != nil) != tt.wantErr { + t.Errorf("GetUnprocessedTagIds() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if len(got) != len(tt.args.tagIds) { + t.Errorf("GetUnprocessedTagIds() got = %v, want %v", got, tt.args.tagIds) + } + }) + } +} + +func Test_metadataRepo_GetProtoFilesForTagId(t *testing.T) { + var noProtoFiles []*v1.ProtoFile + + type args struct { + tagId string + } + tests := []struct { + name string + args args + want []*v1.ProtoFile + wantErr bool + }{ + { + name: "get proto files for tag id", + args: args{ + tagId: fakeUUID, + }, + want: protofiles, + wantErr: false, + }, + { + name: "get proto files for tag id not found", + args: args{ + tagId: "not-found", + }, + want: noProtoFiles, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + repository := suite.metadataRepository + + tagIds, err := repository.GetUnprocessedTagIds(context.Background()) + if err != nil { + t.Errorf("error getting unprocessed tag ids: %v", err) + return + } + + tagId := tt.args.tagId + if tt.args.tagId == fakeUUID { + tagId = tagIds[0] + } + + got, err := repository.GetProtoFilesForTagId(context.Background(), tagId) + if (err != nil) != tt.wantErr { + t.Errorf("GetProtoFilesForTagId() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetProtoFilesForTagId() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_metadataRepo_SaveParsedProtoFiles(t *testing.T) { + type args struct { + tagId string + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "save parsed proto files", + args: args{ + tagId: fakeUUID, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + repository := suite.metadataRepository + + tagIds, err := repository.GetUnprocessedTagIds(context.Background()) + if err != nil { + t.Errorf("error getting unprocessed tag ids: %v", err) + return + } + + tagId := tt.args.tagId + if tt.args.tagId == fakeUUID { + tagId = tagIds[0] + } + + parsedProtoFiles, err := utils.ParseProtoFilesContents(protofiles) + if err != nil { + t.Errorf("error parsing proto files: %v", err) + return + } + + err = repository.SaveParsedProtoFiles(context.Background(), tagId, parsedProtoFiles) + if (err != nil) != tt.wantErr { + t.Errorf("SaveParsedProtoFiles() error = %v, wantErr %v", err, tt.wantErr) + return + } + }) + } +} + +func Test_metadataRepo_GetTagMeta(t *testing.T) { + noImports := []string{} + noRefPackages := []string{} + + tests := []struct { + name string + want *model.TagMeta + wantErr bool + }{ + { + name: "get tag meta", + want: &model.TagMeta{ + Packages: []string{"hello"}, + Imports: noImports, + RefPackages: noRefPackages, + FilesMeta: []*model.FileMeta{ + { + Filename: "hello/test.proto", + Packages: []string{"hello"}, + Imports: noImports, + RefPackages: noRefPackages, + }, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + repository := suite.metadataRepository + + tagIds, err := repository.GetUnprocessedTagIds(context.Background()) + if err != nil { + t.Errorf("error getting unprocessed tag ids: %v", err) + return + } + + parsedProtoFiles, err := utils.ParseProtoFilesContents(protofiles) + if err != nil { + t.Errorf("error parsing proto files: %v", err) + return + } + + tagId := tagIds[0] + err = repository.SaveParsedProtoFiles(context.Background(), tagId, parsedProtoFiles) + if (err != nil) != tt.wantErr { + t.Errorf("SaveParsedProtoFiles() error = %v, wantErr %v", err, tt.wantErr) + return + } + + tagMeta, err := repository.GetTagMetaByTagId(context.Background(), tagId) + if (err != nil) != tt.wantErr { + t.Errorf("GetTagMetaByTagId() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if !reflect.DeepEqual(&tagMeta, &tt.want) { + t.Errorf("GetTagMetaByTagId() got = %v, want %v", tagMeta, tt.want) + } + }) + } +} diff --git a/internal/data/metadata.go b/internal/data/metadata.go new file mode 100644 index 0000000..1c7c374 --- /dev/null +++ b/internal/data/metadata.go @@ -0,0 +1,158 @@ +package data + +import ( + "context" + "errors" + + "github.com/go-kratos/kratos/v2/log" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + v1 "github.com/pbufio/pbuf-registry/gen/pbuf-registry/v1" + "github.com/pbufio/pbuf-registry/internal/model" + "github.com/pbufio/pbuf-registry/internal/utils" +) + +type MetadataRepository interface { + GetUnprocessedTagIds(ctx context.Context) ([]string, error) + GetProtoFilesForTagId(ctx context.Context, tagId string) ([]*v1.ProtoFile, error) + SaveParsedProtoFiles(ctx context.Context, tagId string, files []*model.ParsedProtoFile) error + GetTagMetaByTagId(ctx context.Context, tagId string) (*model.TagMeta, error) +} + +type metadataRepo struct { + pool *pgxpool.Pool + logger *log.Helper +} + +func (m metadataRepo) GetUnprocessedTagIds(ctx context.Context) ([]string, error) { + var tagIds []string + + rows, err := m.pool.Query(ctx, "SELECT id FROM tags WHERE is_processed = false") + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return tagIds, nil + } + m.logger.Errorf("error getting unprocessed tag ids: %v", err) + return nil, err + } + defer rows.Close() + + for rows.Next() { + var tagId string + err = rows.Scan(&tagId) + if err != nil { + m.logger.Errorf("error scanning tag id: %v", err) + return nil, err + } + tagIds = append(tagIds, tagId) + } + + return tagIds, nil +} + +func (m metadataRepo) GetProtoFilesForTagId(ctx context.Context, tagId string) ([]*v1.ProtoFile, error) { + var protoFiles []*v1.ProtoFile + + rows, err := m.pool.Query(ctx, "SELECT filename, content FROM protofiles WHERE tag_id = $1", tagId) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return protoFiles, nil + } + m.logger.Errorf("error getting proto files for tag id %s: %v", tagId, err) + return nil, err + } + defer rows.Close() + + for rows.Next() { + var filename, content string + err = rows.Scan(&filename, &content) + if err != nil { + m.logger.Errorf("error scanning proto file: %v", err) + return nil, err + } + protoFiles = append(protoFiles, &v1.ProtoFile{ + Filename: filename, + Content: content, + }) + } + + return protoFiles, nil +} + +func (m metadataRepo) SaveParsedProtoFiles(ctx context.Context, tagId string, files []*model.ParsedProtoFile) error { + meta, err := utils.RetrieveMeta(files) + if err != nil { + m.logger.Errorf("error retrieving tag meta: %v", err) + return err + } + + tx, err := m.pool.Begin(ctx) + if err != nil { + m.logger.Errorf("error starting transaction: %v", err) + return err + } + defer func(tx pgx.Tx, ctx context.Context) { + err := tx.Rollback(ctx) + if err != nil { + if !errors.Is(err, pgx.ErrTxClosed) { + m.logger.Errorf("error rolling back transaction: %v", err) + } + } + }(tx, ctx) + + for _, file := range files { + // add to proto_parsed_data table + // on duplicate update json + _, err = tx.Exec(ctx, "INSERT INTO proto_parsed_data (tag_id, filename, json) VALUES ($1, $2, $3) ON CONFLICT (tag_id, filename) DO UPDATE SET json = $3", tagId, file.Filename, file.ProtoJson) + if err != nil { + m.logger.Errorf("error inserting parsed proto file: %v", err) + return err + } + } + + // add to tag_meta table + // on duplicate update json + _, err = tx.Exec(ctx, "INSERT INTO tag_meta (tag_id, meta) VALUES ($1, $2) ON CONFLICT (tag_id) DO UPDATE SET meta = $2", tagId, meta) + if err != nil { + m.logger.Errorf("error inserting tag meta: %v", err) + return err + } + + // update tag to processed + _, err = tx.Exec(ctx, "UPDATE tags SET is_processed = true WHERE id = $1", tagId) + if err != nil { + m.logger.Errorf("error updating tag to processed: %v", err) + return err + } + + err = tx.Commit(ctx) + if err != nil { + m.logger.Errorf("error committing transaction: %v", err) + return err + } + + return nil +} + +func (m *metadataRepo) GetTagMetaByTagId(ctx context.Context, tagId string) (*model.TagMeta, error) { + var meta model.TagMeta + + err := m.pool.QueryRow(ctx, "SELECT meta FROM tag_meta WHERE tag_id = $1", tagId).Scan(&meta) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + m.logger.Errorf("error getting tag meta for tag id %s: %v", tagId, err) + return nil, err + } + + return &meta, nil +} + +// NewMetadataRepository create a new metadata repository with pool +func NewMetadataRepository(pool *pgxpool.Pool, logger log.Logger) MetadataRepository { + return &metadataRepo{ + pool: pool, + logger: log.NewHelper(log.With(logger, "module", "data/MetadataRepository")), + } +} diff --git a/internal/data/registry.go b/internal/data/registry.go index a53cf69..28907f0 100644 --- a/internal/data/registry.go +++ b/internal/data/registry.go @@ -24,25 +24,26 @@ type RegistryRepository interface { PushDraftModule(ctx context.Context, name string, tag string, protofiles []*v1.ProtoFile, dependencies []*v1.Dependency) (*v1.Module, error) PullModule(ctx context.Context, name string, tag string) (*v1.Module, []*v1.ProtoFile, error) PullDraftModule(ctx context.Context, name string, tag string) (*v1.Module, []*v1.ProtoFile, error) + GetModuleTagId(ctx context.Context, moduleName string, tag string) (string, error) DeleteModuleTag(ctx context.Context, name string, tag string) error AddModuleDependencies(ctx context.Context, name string, tag string, dependencies []*v1.Dependency) error GetModuleDependencies(ctx context.Context, name string, tag string) ([]*v1.Dependency, error) DeleteObsoleteDraftTags(ctx context.Context) error } -type registryRepository struct { +type registryRepo struct { pool *pgxpool.Pool logger *log.Helper } func NewRegistryRepository(pool *pgxpool.Pool, logger log.Logger) RegistryRepository { - return ®istryRepository{ + return ®istryRepo{ pool: pool, logger: log.NewHelper(log.With(logger, "module", "data/RegistryRepository")), } } -func (r *registryRepository) RegisterModule(ctx context.Context, moduleName string) error { +func (r *registryRepo) RegisterModule(ctx context.Context, moduleName string) error { // Insert module _, err := r.pool.Exec(ctx, "INSERT INTO modules (name) VALUES ($1) ON CONFLICT (name) DO NOTHING", @@ -54,7 +55,7 @@ func (r *registryRepository) RegisterModule(ctx context.Context, moduleName stri return nil } -func (r *registryRepository) GetModule(ctx context.Context, name string) (*v1.Module, error) { +func (r *registryRepo) GetModule(ctx context.Context, name string) (*v1.Module, error) { var module v1.Module err := r.pool.QueryRow(ctx, "SELECT id, name FROM modules WHERE name = $1", @@ -118,7 +119,7 @@ func (r *registryRepository) GetModule(ctx context.Context, name string) (*v1.Mo // ListModules returns a list of modules with paging support // Token is the base64 encoded module name -func (r *registryRepository) ListModules(ctx context.Context, pageSize int, token string) ([]*v1.Module, string, error) { +func (r *registryRepo) ListModules(ctx context.Context, pageSize int, token string) ([]*v1.Module, string, error) { var modules []*v1.Module query := "SELECT id, name FROM modules" @@ -159,7 +160,7 @@ func (r *registryRepository) ListModules(ctx context.Context, pageSize int, toke return modules, nextPageToken, nil } -func (r *registryRepository) DeleteModule(ctx context.Context, name string) error { +func (r *registryRepo) DeleteModule(ctx context.Context, name string) error { // delete all protofiles res, err := r.pool.Exec(ctx, "DELETE FROM protofiles WHERE tag_id IN (SELECT id FROM tags WHERE module_id = (SELECT id FROM modules WHERE name = $1))", @@ -225,7 +226,7 @@ func (r *registryRepository) DeleteModule(ctx context.Context, name string) erro return nil } -func (r *registryRepository) PushModule(ctx context.Context, name string, tag string, protofiles []*v1.ProtoFile) (*v1.Module, error) { +func (r *registryRepo) PushModule(ctx context.Context, name string, tag string, protofiles []*v1.ProtoFile) (*v1.Module, error) { // check if module exists module, err := r.GetModule(ctx, name) if err != nil { @@ -243,8 +244,21 @@ func (r *registryRepository) PushModule(ctx context.Context, name string, tag st } } + tx, err := r.pool.Begin(ctx) + if err != nil { + return nil, fmt.Errorf("could not begin transaction: %w", err) + } + defer func(tx pgx.Tx, ctx context.Context) { + err := tx.Rollback(ctx) + if err != nil { + if !errors.Is(err, pgx.ErrTxClosed) { + r.logger.Errorf("could not rollback transaction: %w", err) + } + } + }(tx, context.Background()) + // create the tag - _, err = r.pool.Exec(ctx, "INSERT INTO tags (module_id, tag) VALUES ($1, $2)", module.Id, tag) + _, err = tx.Exec(ctx, "INSERT INTO tags (module_id, tag) VALUES ($1, $2)", module.Id, tag) if err != nil { return nil, fmt.Errorf("could not insert tag into database: %w", err) } @@ -252,7 +266,7 @@ func (r *registryRepository) PushModule(ctx context.Context, name string, tag st var tagId string // fetch the new tag - err = r.pool.QueryRow(ctx, + err = tx.QueryRow(ctx, "SELECT id FROM tags WHERE module_id = $1 AND tag = $2", module.Id, tag).Scan(&tagId) if err != nil { @@ -261,7 +275,7 @@ func (r *registryRepository) PushModule(ctx context.Context, name string, tag st // insert protofiles for _, protofile := range protofiles { - _, err = r.pool.Exec(ctx, + _, err = tx.Exec(ctx, "INSERT INTO protofiles (tag_id, filename, content) VALUES ($1, $2, $3)", tagId, protofile.Filename, protofile.Content) if err != nil { @@ -269,12 +283,18 @@ func (r *registryRepository) PushModule(ctx context.Context, name string, tag st } } + err = tx.Commit(ctx) + if err != nil { + r.logger.Errorf("could not commit transaction: %w", err) + return nil, fmt.Errorf("could not push the module. internal error") + } + module.Tags = append(module.Tags, tag) return module, nil } -func (r *registryRepository) PullModule(ctx context.Context, name string, tag string) (*v1.Module, []*v1.ProtoFile, error) { +func (r *registryRepo) PullModule(ctx context.Context, name string, tag string) (*v1.Module, []*v1.ProtoFile, error) { // check if module exists module, err := r.GetModule(ctx, name) if err != nil { @@ -285,7 +305,7 @@ func (r *registryRepository) PullModule(ctx context.Context, name string, tag st return nil, nil, errors.New("module not found") } - tagId, err := r.getModuleTagId(ctx, name, tag) + tagId, err := r.GetModuleTagId(ctx, name, tag) if err != nil { return nil, nil, fmt.Errorf("could not get tag id: %w", err) } @@ -317,7 +337,7 @@ func (r *registryRepository) PullModule(ctx context.Context, name string, tag st return module, protofiles, nil } -func (r *registryRepository) PullDraftModule(ctx context.Context, name string, tag string) (*v1.Module, []*v1.ProtoFile, error) { +func (r *registryRepo) PullDraftModule(ctx context.Context, name string, tag string) (*v1.Module, []*v1.ProtoFile, error) { // fetch module with GetModule method and get all other from draft_tags table module, err := r.GetModule(ctx, name) if err != nil { @@ -351,8 +371,8 @@ func (r *registryRepository) PullDraftModule(ctx context.Context, name string, t return module, protofiles, nil } -func (r *registryRepository) DeleteModuleTag(ctx context.Context, name string, tag string) error { - tagId, err := r.getModuleTagId(ctx, name, tag) +func (r *registryRepo) DeleteModuleTag(ctx context.Context, name string, tag string) error { + tagId, err := r.GetModuleTagId(ctx, name, tag) if err != nil { return fmt.Errorf("could not get tag id: %w", err) } @@ -402,9 +422,9 @@ func (r *registryRepository) DeleteModuleTag(ctx context.Context, name string, t return nil } -func (r *registryRepository) AddModuleDependencies(ctx context.Context, name string, tag string, dependencies []*v1.Dependency) error { +func (r *registryRepo) AddModuleDependencies(ctx context.Context, name string, tag string, dependencies []*v1.Dependency) error { // find the tag id by name and tag - tagId, err := r.getModuleTagId(ctx, name, tag) + tagId, err := r.GetModuleTagId(ctx, name, tag) if err != nil { return fmt.Errorf("could not get tag id: %w", err) } @@ -434,9 +454,8 @@ func (r *registryRepository) AddModuleDependencies(ctx context.Context, name str return nil } -func (r *registryRepository) GetModuleDependencies(ctx context.Context, name string, tag string) ([]*v1.Dependency, error) { +func (r *registryRepo) GetModuleDependencies(ctx context.Context, name string, tag string) ([]*v1.Dependency, error) { var dependencies []*v1.Dependency - // find the latest tag if tag is empty if tag == "" { err := r.pool.QueryRow(ctx, @@ -450,7 +469,7 @@ func (r *registryRepository) GetModuleDependencies(ctx context.Context, name str } } - tagId, err := r.getModuleTagId(ctx, name, tag) + tagId, err := r.GetModuleTagId(ctx, name, tag) if err != nil { return nil, fmt.Errorf("could not get tag id: %w", err) } @@ -497,7 +516,7 @@ func (r *registryRepository) GetModuleDependencies(ctx context.Context, name str return dependencies, nil } -func (r *registryRepository) PushDraftModule(ctx context.Context, name string, tag string, protofiles []*v1.ProtoFile, dependencies []*v1.Dependency) (*v1.Module, error) { +func (r *registryRepo) PushDraftModule(ctx context.Context, name string, tag string, protofiles []*v1.ProtoFile, dependencies []*v1.Dependency) (*v1.Module, error) { // check if module exists module, err := r.GetModule(ctx, name) if err != nil { @@ -544,7 +563,7 @@ func (r *registryRepository) PushDraftModule(ctx context.Context, name string, t return module, nil } -func (r *registryRepository) getModuleTagId(ctx context.Context, moduleName string, tag string) (string, error) { +func (r *registryRepo) GetModuleTagId(ctx context.Context, moduleName string, tag string) (string, error) { // check if tag exists var tagId string err := r.pool.QueryRow(ctx, @@ -562,7 +581,7 @@ func (r *registryRepository) getModuleTagId(ctx context.Context, moduleName stri // DeleteObsoleteDraftTags deletes all draft tags // that are older than 7 days -func (r *registryRepository) DeleteObsoleteDraftTags(ctx context.Context) error { +func (r *registryRepo) DeleteObsoleteDraftTags(ctx context.Context) error { res, err := r.pool.Exec(ctx, "DELETE FROM draft_tags WHERE updated_at < NOW() - INTERVAL '7 days'") if err != nil { diff --git a/internal/mocks/daemon.go b/internal/mocks/daemon.go new file mode 100644 index 0000000..3caae36 --- /dev/null +++ b/internal/mocks/daemon.go @@ -0,0 +1,60 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// Daemon is an autogenerated mock type for the Daemon type +type Daemon struct { + mock.Mock +} + +// Name provides a mock function with given fields: +func (_m *Daemon) Name() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Name") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Run provides a mock function with given fields: +func (_m *Daemon) Run() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Run") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewDaemon creates a new instance of Daemon. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDaemon(t interface { + mock.TestingT + Cleanup(func()) +}) *Daemon { + mock := &Daemon{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/metadata_repository.go b/internal/mocks/metadata_repository.go new file mode 100644 index 0000000..46bcaf7 --- /dev/null +++ b/internal/mocks/metadata_repository.go @@ -0,0 +1,140 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + model "github.com/pbufio/pbuf-registry/internal/model" + + v1 "github.com/pbufio/pbuf-registry/gen/pbuf-registry/v1" +) + +// MetadataRepository is an autogenerated mock type for the MetadataRepository type +type MetadataRepository struct { + mock.Mock +} + +// GetProtoFilesForTagId provides a mock function with given fields: ctx, tagId +func (_m *MetadataRepository) GetProtoFilesForTagId(ctx context.Context, tagId string) ([]*v1.ProtoFile, error) { + ret := _m.Called(ctx, tagId) + + if len(ret) == 0 { + panic("no return value specified for GetProtoFilesForTagId") + } + + var r0 []*v1.ProtoFile + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]*v1.ProtoFile, error)); ok { + return rf(ctx, tagId) + } + if rf, ok := ret.Get(0).(func(context.Context, string) []*v1.ProtoFile); ok { + r0 = rf(ctx, tagId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*v1.ProtoFile) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, tagId) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTagMetaByTagId provides a mock function with given fields: ctx, tagId +func (_m *MetadataRepository) GetTagMetaByTagId(ctx context.Context, tagId string) (*model.TagMeta, error) { + ret := _m.Called(ctx, tagId) + + if len(ret) == 0 { + panic("no return value specified for GetTagMetaByTagId") + } + + var r0 *model.TagMeta + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*model.TagMeta, error)); ok { + return rf(ctx, tagId) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *model.TagMeta); ok { + r0 = rf(ctx, tagId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.TagMeta) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, tagId) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetUnprocessedTagIds provides a mock function with given fields: ctx +func (_m *MetadataRepository) GetUnprocessedTagIds(ctx context.Context) ([]string, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetUnprocessedTagIds") + } + + var r0 []string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]string, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) []string); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SaveParsedProtoFiles provides a mock function with given fields: ctx, tagId, files +func (_m *MetadataRepository) SaveParsedProtoFiles(ctx context.Context, tagId string, files []*model.ParsedProtoFile) error { + ret := _m.Called(ctx, tagId, files) + + if len(ret) == 0 { + panic("no return value specified for SaveParsedProtoFiles") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, []*model.ParsedProtoFile) error); ok { + r0 = rf(ctx, tagId, files) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewMetadataRepository creates a new instance of MetadataRepository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMetadataRepository(t interface { + mock.TestingT + Cleanup(func()) +}) *MetadataRepository { + mock := &MetadataRepository{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/registry_repository.go b/internal/mocks/registry_repository.go index cab4105..53556d0 100644 --- a/internal/mocks/registry_repository.go +++ b/internal/mocks/registry_repository.go @@ -147,6 +147,34 @@ func (_m *RegistryRepository) GetModuleDependencies(ctx context.Context, name st return r0, r1 } +// GetModuleTagId provides a mock function with given fields: ctx, moduleName, tag +func (_m *RegistryRepository) GetModuleTagId(ctx context.Context, moduleName string, tag string) (string, error) { + ret := _m.Called(ctx, moduleName, tag) + + if len(ret) == 0 { + panic("no return value specified for GetModuleTagId") + } + + var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (string, error)); ok { + return rf(ctx, moduleName, tag) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string) string); ok { + r0 = rf(ctx, moduleName, tag) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, moduleName, tag) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // ListModules provides a mock function with given fields: ctx, pageSize, token func (_m *RegistryRepository) ListModules(ctx context.Context, pageSize int, token string) ([]*v1.Module, string, error) { ret := _m.Called(ctx, pageSize, token) diff --git a/internal/model/proto.go b/internal/model/proto.go new file mode 100644 index 0000000..e0425e2 --- /dev/null +++ b/internal/model/proto.go @@ -0,0 +1,25 @@ +package model + +import ( + "github.com/yoheimuta/go-protoparser/v4/interpret/unordered" +) + +type ParsedProtoFile struct { + Filename string + Proto *unordered.Proto + ProtoJson string +} + +type TagMeta struct { + Packages []string + Imports []string + RefPackages []string + FilesMeta []*FileMeta +} + +type FileMeta struct { + Filename string + Packages []string + Imports []string + RefPackages []string +} diff --git a/internal/server/registry.go b/internal/server/registry.go index 27e8bf4..f93d18f 100644 --- a/internal/server/registry.go +++ b/internal/server/registry.go @@ -18,12 +18,14 @@ const ( type RegistryServer struct { v1.UnimplementedRegistryServer registryRepository data.RegistryRepository + metadataRepository data.MetadataRepository logger *log.Helper } -func NewRegistryServer(registryRepository data.RegistryRepository, logger log.Logger) *RegistryServer { +func NewRegistryServer(registryRepository data.RegistryRepository, metadataRepository data.MetadataRepository, logger log.Logger) *RegistryServer { return &RegistryServer{ registryRepository: registryRepository, + metadataRepository: metadataRepository, logger: log.NewHelper(log.With(logger, "module", "server/RegistryServer")), } } @@ -63,6 +65,24 @@ func (r *RegistryServer) GetModule(ctx context.Context, request *v1.GetModuleReq return nil, errors.New("module not found") } + if len(module.Tags) > 0 { + tagId, err := r.registryRepository.GetModuleTagId(ctx, name, module.Tags[0]) + if err != nil { + r.logger.Infof("error getting module tag id: %v", err) + return nil, err + } + + tagMeta, err := r.metadataRepository.GetTagMetaByTagId(ctx, tagId) + if err != nil { + r.logger.Infof("error getting tag meta: %v", err) + return nil, err + } + + if tagMeta != nil { + module.Packages = tagMeta.Packages + } + } + return module, nil } @@ -113,7 +133,7 @@ func (r *RegistryServer) PushModule(ctx context.Context, request *v1.PushModuleR return nil, errors.New("tag cannot be empty") } - err := utils.ValidateProtoFiles(request.Protofiles) + err := utils.ValidateProtoFiles(request.Protofiles, r.logger) if err != nil { return nil, err } diff --git a/internal/server/registry_test.go b/internal/server/registry_test.go index f955ffc..b06d4b5 100644 --- a/internal/server/registry_test.go +++ b/internal/server/registry_test.go @@ -23,7 +23,9 @@ func setup(ctx context.Context) (v1.RegistryClient, func()) { registryRepository := &mocks.RegistryRepository{} registryRepository.On("RegisterModule", mock.Anything, "hello").Return(nil) - v1.RegisterRegistryServer(baseServer, NewRegistryServer(registryRepository, nil)) + metadataRepository := &mocks.MetadataRepository{} + + v1.RegisterRegistryServer(baseServer, NewRegistryServer(registryRepository, metadataRepository, nil)) go func() { if err := baseServer.Serve(lis); err != nil { log.Printf("error serving server: %v", err) diff --git a/internal/utils/checkers.go b/internal/utils/checkers.go index db0d2c9..bad7a57 100644 --- a/internal/utils/checkers.go +++ b/internal/utils/checkers.go @@ -3,15 +3,13 @@ package utils import ( "errors" "fmt" - "strings" - "github.com/google/martian/log" + "github.com/go-kratos/kratos/v2/log" v1 "github.com/pbufio/pbuf-registry/gen/pbuf-registry/v1" - "github.com/yoheimuta/go-protoparser/v4" ) // ValidateProtoFiles validates proto files -func ValidateProtoFiles(protoFiles []*v1.ProtoFile) error { +func ValidateProtoFiles(protoFiles []*v1.ProtoFile, logger *log.Helper) error { for _, protoFile := range protoFiles { if protoFile.Filename == "" { return errors.New("filename cannot be empty") @@ -22,7 +20,7 @@ func ValidateProtoFiles(protoFiles []*v1.ProtoFile) error { } // check that file contents is valid - err := parseProtoFile(protoFile.Filename, protoFile.Content) + _, err := parseProtoFile(protoFile.Content) if err != nil { return fmt.Errorf("invalid proto file %s: %w", protoFile.Filename, err) } @@ -30,13 +28,3 @@ func ValidateProtoFiles(protoFiles []*v1.ProtoFile) error { return nil } - -func parseProtoFile(filename, content string) error { - _, err := protoparser.Parse(strings.NewReader(content)) - if err != nil { - log.Infof("error parsing proto file %s: %v", filename, err) - return err - } - - return nil -} diff --git a/internal/utils/checkers_test.go b/internal/utils/checkers_test.go index 2414add..ea008c1 100644 --- a/internal/utils/checkers_test.go +++ b/internal/utils/checkers_test.go @@ -66,7 +66,7 @@ func TestValidateProtoFiles(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := ValidateProtoFiles(tt.args.protoFiles); (err != nil) != tt.wantErr { + if err := ValidateProtoFiles(tt.args.protoFiles, nil); (err != nil) != tt.wantErr { t.Errorf("ValidateProtoFiles() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/internal/utils/meta.go b/internal/utils/meta.go new file mode 100644 index 0000000..cdf343f --- /dev/null +++ b/internal/utils/meta.go @@ -0,0 +1,183 @@ +package utils + +import ( + "sort" + "strings" + + "github.com/pbufio/pbuf-registry/internal/model" + "github.com/yoheimuta/go-protoparser/v4/interpret/unordered" + "github.com/yoheimuta/go-protoparser/v4/parser" + "golang.org/x/exp/maps" +) + +// RetrieveMeta retrieves metadata from parsed proto files +func RetrieveMeta(files []*model.ParsedProtoFile) (*model.TagMeta, error) { + packages := make(map[string]bool) + imports := make(map[string]bool) + refPackages := make(map[string]bool) + + filesMeta := make([]*model.FileMeta, len(files)) + + for i, file := range files { + filePackages := make(map[string]bool) + fileImports := make(map[string]bool) + fileRefPackages := make(map[string]bool) + + protoBody := file.Proto.ProtoBody + + for _, pkg := range protoBody.Packages { + packages[pkg.Name] = true + filePackages[pkg.Name] = true + } + + for _, imp := range protoBody.Imports { + location := strings.Trim(imp.Location, "\"") + imports[location] = true + fileImports[location] = true + } + + var retrievedRefPackages []string + + retrievedRefPackages = append(retrievedRefPackages, getRefPackagesFromOptions(protoBody.Options)...) + retrievedRefPackages = append(retrievedRefPackages, getRefPackagesForMessages(protoBody.Messages)...) + retrievedRefPackages = append(retrievedRefPackages, getRefPackagesForEnums(protoBody.Enums)...) + + for _, service := range protoBody.Services { + retrievedRefPackages = append(retrievedRefPackages, getRefPackagesFromOptions(service.ServiceBody.Options)...) + for _, rpc := range service.ServiceBody.RPCs { + retrievedRefPackages = append(retrievedRefPackages, getRefPackagesFromOptions(rpc.Options)...) + retrievedRefPackages = append(retrievedRefPackages, getRefPackageName(rpc.RPCRequest.MessageType)) + retrievedRefPackages = append(retrievedRefPackages, getRefPackageName(rpc.RPCResponse.MessageType)) + } + } + + for _, refPkg := range retrievedRefPackages { + if refPkg != "" { + refPackages[refPkg] = true + fileRefPackages[refPkg] = true + } + } + + uniquePackages := maps.Keys(filePackages) + sort.Strings(uniquePackages) + + uniqueImports := maps.Keys(fileImports) + sort.Strings(uniqueImports) + + uniqueRefPackages := maps.Keys(fileRefPackages) + sort.Strings(uniqueRefPackages) + + filesMeta[i] = &model.FileMeta{ + Filename: file.Filename, + Packages: uniquePackages, + Imports: uniqueImports, + RefPackages: uniqueRefPackages, + } + } + + uniquePackages := maps.Keys(packages) + sort.Strings(uniquePackages) + + uniqueImports := maps.Keys(imports) + sort.Strings(uniqueImports) + + uniqueRefPackages := maps.Keys(refPackages) + sort.Strings(uniqueRefPackages) + + return &model.TagMeta{ + Packages: uniquePackages, + Imports: uniqueImports, + RefPackages: uniqueRefPackages, + FilesMeta: filesMeta, + }, nil +} + +func getRefPackagesForMessages(messages []*unordered.Message) []string { + var names []string + for _, message := range messages { + names = append(names, getRefPackagesForMessages(message.MessageBody.Messages)...) + + names = append(names, getRefPackagesFromOptions(message.MessageBody.Options)...) + names = append(names, getRefPackagesForEnums(message.MessageBody.Enums)...) + names = append(names, getRefPackagesForFields(message.MessageBody.Fields)...) + names = append(names, getRefPackagesForOneOfs(message.MessageBody.Oneofs)...) + names = append(names, getRefPackagesForMaps(message.MessageBody.Maps)...) + } + + return names +} + +func getRefPackagesForMaps(fields []*parser.MapField) []string { + var names []string + for _, field := range fields { + names = append(names, getRefPackagesFromFieldOptions(field.FieldOptions)...) + names = append(names, getRefPackageName(field.Type)) + } + + return names +} + +func getRefPackagesForFields(fields []*parser.Field) []string { + var names []string + for _, field := range fields { + names = append(names, getRefPackagesFromFieldOptions(field.FieldOptions)...) + names = append(names, getRefPackageName(field.Type)) + } + + return names +} + +func getRefPackagesForOneOfs(oneofs []*parser.Oneof) []string { + var names []string + for _, oneof := range oneofs { + names = append(names, getRefPackagesFromOptions(oneof.Options)...) + + for _, field := range oneof.OneofFields { + names = append(names, getRefPackagesFromFieldOptions(field.FieldOptions)...) + names = append(names, getRefPackageName(field.Type)) + } + } + + return names +} + +func getRefPackagesForEnums(enums []*unordered.Enum) []string { + var names []string + for _, enum := range enums { + names = append(names, getRefPackagesFromOptions(enum.EnumBody.Options)...) + } + + return names +} + +func getRefPackagesFromOptions(options []*parser.Option) []string { + names := make([]string, len(options)) + for i, option := range options { + names[i] = getRefPackageName(option.OptionName) + } + + return names +} + +func getRefPackagesFromFieldOptions(options []*parser.FieldOption) []string { + var names []string + for _, option := range options { + names = append(names, getRefPackageName(option.OptionName)) + } + + return names +} + +func getRefPackageName(name string) string { + if strings.HasPrefix(name, "(") { + name = strings.Split(name, "(")[1] + name = strings.Split(name, ")")[0] + } + + if strings.Contains(name, ".") { + split := strings.Split(name, ".") + return strings.TrimSpace(strings.Join(split[:len(split)-1], ".")) + } + + return "" +} diff --git a/internal/utils/meta_test.go b/internal/utils/meta_test.go new file mode 100644 index 0000000..9400dba --- /dev/null +++ b/internal/utils/meta_test.go @@ -0,0 +1,92 @@ +package utils + +import ( + "reflect" + "testing" + + v1 "github.com/pbufio/pbuf-registry/gen/pbuf-registry/v1" + "github.com/pbufio/pbuf-registry/internal/model" +) + +func TestRetrieveMeta(t *testing.T) { + parsedProtoFiles, err := ParseProtoFilesContents([]*v1.ProtoFile{ + { + Filename: "test.proto", + Content: complexProtoFile, + }, + }) + if err != nil { + t.Errorf("ParseProtoFilesContents() error = %v", err) + return + } + + type args struct { + files []*model.ParsedProtoFile + } + tests := []struct { + name string + args args + want *model.TagMeta + wantErr bool + }{ + { + name: "complex proto file", + args: args{ + files: parsedProtoFiles, + }, + want: &model.TagMeta{ + Packages: []string{"pbufregistry.v1"}, + Imports: []string{"google/api/annotations.proto", "pbuf-registry/v1/entities.proto"}, + RefPackages: []string{"google.api", "test1", "test2", "validate"}, + FilesMeta: []*model.FileMeta{ + { + Filename: "test.proto", + Packages: []string{"pbufregistry.v1"}, + Imports: []string{"google/api/annotations.proto", "pbuf-registry/v1/entities.proto"}, + RefPackages: []string{"google.api", "test1", "test2", "validate"}, + }, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := RetrieveMeta(tt.args.files) + if (err != nil) != tt.wantErr { + t.Errorf("RetrieveMeta() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(&got, &tt.want) { + t.Errorf("RetrieveMeta() got = %v, want %v", got, tt.want) + } + }) + } +} + +const complexProtoFile = ` +syntax = "proto3"; + +package pbufregistry.v1; + +import "google/api/annotations.proto"; +import "pbuf-registry/v1/entities.proto"; + +option go_package = "pbufregistry/api/v1;v1"; + +// Registry service definition +service Registry { + // List all registered modules + rpc ListModules(test2.ListModulesRequest) returns (ListModulesResponse) { + option (google.api.http) = { + get: "/v1/modules" + }; + } +} + +// ListModulesResponse is the response message for ListModules. +message ListModulesResponse { + // The modules requested. + repeated test1.Module modules = 1; + string next_page_token = 2 [json_name = "next_page_token", (validate.rules).string.uuid = true]; +}` diff --git a/internal/utils/parsers.go b/internal/utils/parsers.go new file mode 100644 index 0000000..d9b7256 --- /dev/null +++ b/internal/utils/parsers.go @@ -0,0 +1,56 @@ +package utils + +import ( + "encoding/json" + "fmt" + "strings" + + v1 "github.com/pbufio/pbuf-registry/gen/pbuf-registry/v1" + "github.com/pbufio/pbuf-registry/internal/model" + "github.com/yoheimuta/go-protoparser/v4" + "github.com/yoheimuta/go-protoparser/v4/interpret/unordered" + "github.com/yoheimuta/go-protoparser/v4/parser" +) + +func ParseProtoFilesContents(protoFiles []*v1.ProtoFile) ([]*model.ParsedProtoFile, error) { + parsedProtoFiles := make([]*model.ParsedProtoFile, len(protoFiles)) + + for i, protoFile := range protoFiles { + if protoFile.Content == "" { + return parsedProtoFiles, fmt.Errorf("content of %s cannot be empty", protoFile.Filename) + } + + // check that file contents is valid + parsed, err := parseProtoFile(protoFile.Content) + if err != nil { + return parsedProtoFiles, fmt.Errorf("invalid proto file %s: %w", protoFile.Filename, err) + } + + proto, err := unordered.InterpretProto(parsed) + if err != nil { + return parsedProtoFiles, fmt.Errorf("cannot interpret file %s: %w", protoFile.Filename, err) + } + + jsonContent, err := json.Marshal(proto) + if err != nil { + return parsedProtoFiles, fmt.Errorf("cannot marshall %s: %w", protoFile.Filename, err) + } + + parsedProtoFiles[i] = &model.ParsedProtoFile{ + Filename: protoFile.Filename, + Proto: proto, + ProtoJson: string(jsonContent), + } + } + + return parsedProtoFiles, nil +} + +func parseProtoFile(content string) (*parser.Proto, error) { + parsed, err := protoparser.Parse(strings.NewReader(content)) + if err != nil { + return nil, err + } + + return parsed, nil +} diff --git a/internal/utils/parsers_test.go b/internal/utils/parsers_test.go new file mode 100644 index 0000000..6ed10e5 --- /dev/null +++ b/internal/utils/parsers_test.go @@ -0,0 +1,89 @@ +package utils + +import ( + "reflect" + "testing" + + v1 "github.com/pbufio/pbuf-registry/gen/pbuf-registry/v1" + "github.com/pbufio/pbuf-registry/internal/model" + "github.com/yoheimuta/go-protoparser/v4/interpret/unordered" + "github.com/yoheimuta/go-protoparser/v4/parser" +) + +const ( + testProtoContent = `syntax = "proto3"; package hello; message Hello {}` +) + +func TestParseProtoFilesContents(t *testing.T) { + type args struct { + protoFiles []*v1.ProtoFile + } + tests := []struct { + name string + args args + want []*model.ParsedProtoFile + wantErr bool + }{ + { + name: "Parse proto files", + args: args{ + protoFiles: []*v1.ProtoFile{ + { + Filename: "hello/test.proto", + Content: testProtoContent, + }, + }, + }, + want: []*model.ParsedProtoFile{ + { + Filename: "hello/test.proto", + Proto: &unordered.Proto{ + Syntax: &parser.Syntax{ + ProtobufVersion: "proto3", + }, + ProtoBody: &unordered.ProtoBody{ + Packages: []*parser.Package{ + { + Name: "hello", + }, + }, + Messages: []*unordered.Message{ + { + MessageName: "Hello", + }, + }, + }, + }, + ProtoJson: `{"Syntax":{"ProtobufVersion":"proto3","ProtobufVersionQuote":"\"proto3\"","Comments":null,"InlineComment":null,"Meta":{"Pos":{"Filename":"","Offset":0,"Line":1,"Column":1},"LastPos":{"Filename":"","Offset":17,"Line":1,"Column":18}}},"ProtoBody":{"Imports":null,"Packages":[{"Name":"hello","Comments":null,"InlineComment":null,"Meta":{"Pos":{"Filename":"","Offset":19,"Line":1,"Column":20},"LastPos":{"Filename":"","Offset":32,"Line":1,"Column":33}}}],"Options":null,"Messages":[{"MessageName":"Hello","MessageBody":{"Fields":null,"Enums":null,"Messages":null,"Options":null,"Oneofs":null,"Maps":null,"Groups":null,"Reserves":null,"Extends":null,"EmptyStatements":null,"Extensions":null},"Comments":null,"InlineComment":null,"InlineCommentBehindLeftCurly":null,"Meta":{"Pos":{"Filename":"","Offset":34,"Line":1,"Column":35},"LastPos":{"Filename":"","Offset":49,"Line":1,"Column":50}}}],"Extends":null,"Enums":null,"Services":null,"EmptyStatements":null}}`, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseProtoFilesContents(tt.args.protoFiles) + if (err != nil) != tt.wantErr { + t.Errorf("ParseProtoFilesContents() error = %v, wantErr %v", err, tt.wantErr) + return + } + for i, g := range got { + if !reflect.DeepEqual(g.Filename, tt.want[i].Filename) { + t.Errorf("ParseProtoFilesContents() got = %v, want %v", g.Filename, tt.want[i].Filename) + } + if !reflect.DeepEqual(g.Proto.Syntax.ProtobufVersion, tt.want[i].Proto.Syntax.ProtobufVersion) { + t.Errorf("ParseProtoFilesContents() got = %v, want %v", g.Proto.Syntax.ProtobufVersion, tt.want[i].Proto.Syntax.ProtobufVersion) + } + if !reflect.DeepEqual(g.Proto.ProtoBody.Packages[0].Name, tt.want[i].Proto.ProtoBody.Packages[0].Name) { + t.Errorf("ParseProtoFilesContents() got = %v, want %v", g.Proto.ProtoBody.Packages[0].Name, tt.want[i].Proto.ProtoBody.Packages[0].Name) + } + if !reflect.DeepEqual(g.Proto.ProtoBody.Messages[0].MessageName, tt.want[i].Proto.ProtoBody.Messages[0].MessageName) { + t.Errorf("ParseProtoFilesContents() got = %v, want %v", g.Proto.ProtoBody.Messages[0].MessageName, tt.want[i].Proto.ProtoBody.Messages[0].MessageName) + } + if !reflect.DeepEqual(g.ProtoJson, tt.want[i].ProtoJson) { + t.Errorf("ParseProtoFilesContents() got = %v, want %v", g.ProtoJson, tt.want[i].ProtoJson) + } + } + }) + } +} diff --git a/migrations/20231204150434_proto_parsed_data.sql b/migrations/20231204150434_proto_parsed_data.sql new file mode 100644 index 0000000..34eaf27 --- /dev/null +++ b/migrations/20231204150434_proto_parsed_data.sql @@ -0,0 +1,35 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE tags ADD COLUMN is_processed BOOLEAN NOT NULL DEFAULT FALSE; + +CREATE TABLE IF NOT EXISTS proto_parsed_data ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tag_id UUID NOT NULL, + filename TEXT NOT NULL, + json JSONB NOT NULL, + UNIQUE (tag_id, filename) + ); + +CREATE TABLE IF NOT EXISTS tag_meta ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tag_id UUID NOT NULL, + meta JSONB NOT NULL, + UNIQUE (tag_id) + ); + +CREATE INDEX IF NOT EXISTS idx_tags_is_processed ON tags (is_processed); +CREATE INDEX IF NOT EXISTS idx_proto_parsed_data_tag_id ON proto_parsed_data (tag_id); +CREATE INDEX IF NOT EXISTS idx_tag_meta_tag_id ON tag_meta (tag_id); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP INDEX IF EXISTS idx_tags_is_processed; +ALTER TABLE tags DROP COLUMN is_processed; + +DROP INDEX IF EXISTS idx_proto_parsed_data_tag_id; +DROP INDEX IF EXISTS idx_tag_meta_tag_id; + +DROP TABLE IF EXISTS proto_parsed_data; +DROP TABLE IF EXISTS tag_meta; +-- +goose StatementEnd