From 364798861c6c641358b8876a97ff4b89ead074c6 Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Sun, 9 Nov 2025 12:34:43 -0500 Subject: [PATCH 1/2] Use VIN attestations --- cmd/rewards-api/main.go | 22 ++++++- go.mod | 3 +- go.sum | 6 +- internal/config/settings.go | 1 + internal/constants/constants.go | 6 ++ internal/services/rewards.go | 113 +++++++++++++++++++------------- 6 files changed, 97 insertions(+), 54 deletions(-) diff --git a/cmd/rewards-api/main.go b/cmd/rewards-api/main.go index f09c8a0..3cd65ce 100644 --- a/cmd/rewards-api/main.go +++ b/cmd/rewards-api/main.go @@ -15,11 +15,15 @@ import ( _ "github.com/lib/pq" + pb_att "github.com/DIMO-Network/attestation-api/pkg/grpc" pb_devices "github.com/DIMO-Network/devices-api/pkg/grpc" + pb_fetch "github.com/DIMO-Network/fetch-api/pkg/grpc" + _ "github.com/DIMO-Network/rewards-api/docs" "github.com/DIMO-Network/rewards-api/internal/api" "github.com/DIMO-Network/rewards-api/internal/config" "github.com/DIMO-Network/rewards-api/internal/controllers" + pb_tesla "github.com/DIMO-Network/tesla-oracle/pkg/grpc" "github.com/DIMO-Network/rewards-api/internal/database" @@ -253,7 +257,23 @@ func main() { teslaClient := pb_tesla.NewTeslaOracleClient(teslaConn) - baselineRewardClient := services.NewBaselineRewardService(&settings, transferService, chClient, deviceClient, identClient, week, &logger, teslaClient) + attConn, err := grpc.NewClient(settings.AttestationGRPCAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + logger.Fatal().Err(err).Msg("Failed to create fetch-api connection.") + } + defer attConn.Close() + + attClient := pb_att.NewAttestationServiceClient(attConn) + + fetchConn, err := grpc.NewClient(settings.FetchAPIGRPCAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + logger.Fatal().Err(err).Msg("Failed to create devices-api connection.") + } + defer fetchConn.Close() + + fetchClient := pb_fetch.NewFetchServiceClient(fetchConn) + + baselineRewardClient := services.NewBaselineRewardService(&settings, transferService, chClient, deviceClient, identClient, week, &logger, teslaClient, attClient, fetchClient) if err := baselineRewardClient.BaselineIssuance(); err != nil { logger.Fatal().Err(err).Int("issuanceWeek", week).Msg("Failed to calculate and/or transfer rewards.") diff --git a/go.mod b/go.mod index af9894b..9f4cb02 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,12 @@ replace github.com/ericlagergren/decimal => github.com/ericlagergren/decimal v0. require ( github.com/ClickHouse/clickhouse-go/v2 v2.40.3 - github.com/DIMO-Network/attestation-api v0.1.9 + github.com/DIMO-Network/attestation-api v0.1.10 github.com/DIMO-Network/clickhouse-infra v0.0.7 github.com/DIMO-Network/cloudevent v0.1.4 github.com/DIMO-Network/devices-api v1.41.8 github.com/DIMO-Network/fetch-api v0.0.14 github.com/DIMO-Network/model-garage v0.7.4 - github.com/DIMO-Network/set v0.0.0-20250627202730-1145b5cbaecb github.com/DIMO-Network/shared v1.0.7 github.com/DIMO-Network/tesla-oracle v0.4.18 github.com/IBM/sarama v1.46.1 diff --git a/go.sum b/go.sum index e13d1cc..6141085 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,8 @@ github.com/ClickHouse/clickhouse-go/v2 v2.40.3 h1:46jB4kKwVDUOnECpStKMVXxvR0Cg9z github.com/ClickHouse/clickhouse-go/v2 v2.40.3/go.mod h1:qO0HwvjCnTB4BPL/k6EE3l4d9f/uF+aoimAhJX70eKA= github.com/DATA-DOG/go-sqlmock v1.4.1 h1:ThlnYciV1iM/V0OSF/dtkqWb6xo5qITT1TJBG1MRDJM= github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= -github.com/DIMO-Network/attestation-api v0.1.9 h1:bVm/cX4PBlpVPZFGzHzbtOvB0yRUHOto4swGocvRHlE= -github.com/DIMO-Network/attestation-api v0.1.9/go.mod h1:ivVn5+vDK5/hhNuL48Wi8M8BdeNpIapHqIPgJjIbzZY= +github.com/DIMO-Network/attestation-api v0.1.10 h1:HdKGLlYTjuoX/m2srRmkk+K5aYT6pm4aHz4nL0u+hi8= +github.com/DIMO-Network/attestation-api v0.1.10/go.mod h1:3R46KVOFa7WmrxR99aX68dZw2Kl6ffle1yFCSx0gtNg= github.com/DIMO-Network/clickhouse-infra v0.0.7 h1:TAsjkFFKu3D5Xg6dwBcRBryjCVSlXsNjVbTwJ4UDlTg= github.com/DIMO-Network/clickhouse-infra v0.0.7/go.mod h1:XS80lhSJNWBWGgZ+m4j7++zFj1wAXfmtV2gJfhGlabQ= github.com/DIMO-Network/cloudevent v0.1.4 h1:c6Sq4CyHt05V8OtnEXekUCRGfVuR1pFkJevfiKt1sYM= @@ -22,8 +22,6 @@ github.com/DIMO-Network/fetch-api v0.0.14 h1:Pzl/q88gt78DPeXyeDhq0eFp7yIE4ZvuNi8 github.com/DIMO-Network/fetch-api v0.0.14/go.mod h1:88wlGSCliybeZ+XJyJkL8Cf/d8CTm8vQeB8obJNr3tY= github.com/DIMO-Network/model-garage v0.7.4 h1:mFnGPPMxOWexeJe7LmykLlwIr5JgCC5C0giIssmZRio= github.com/DIMO-Network/model-garage v0.7.4/go.mod h1:xtHRSvILRrC99CGavoaUFQT+hOxbghbTy5MYzT2KTtM= -github.com/DIMO-Network/set v0.0.0-20250627202730-1145b5cbaecb h1:oT0gCBPGBsGwO49UdCEYaCdDWrxfwk8AvKL+KzlY/ZY= -github.com/DIMO-Network/set v0.0.0-20250627202730-1145b5cbaecb/go.mod h1:rNyMOJr8tcQzIkGK/9LAbLNXBzVHtTYxqStwVh+T+ws= github.com/DIMO-Network/shared v1.0.7 h1:LfSgsqJ6R7EUyfo2GTfuhrCpoDcweJqe7eVOa4j7Xbo= github.com/DIMO-Network/shared v1.0.7/go.mod h1:lDHUKwwT2LW6Zvd42Nb33dXklRNTmfyOlbUNx2dQfGY= github.com/DIMO-Network/tesla-oracle v0.4.18 h1:VRbdwWTJRv95NV9Ut/FfFAAGt9/cyF5mDe+xnGPY2vA= diff --git a/internal/config/settings.go b/internal/config/settings.go index 43f50d0..fa6257a 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -33,6 +33,7 @@ type Settings struct { MobileAPIBaseURL string `yaml:"MOBILE_API_BASE_URL"` StorageNodeDevLicense common.Address `yaml:"STORAGE_NODE_DEV_LICENSE"` VINVCConcurrencyLimit int `yaml:"VINVC_CONCURRENCY_LIMIT"` + AttestationGRPCAddr string `yaml:"ATTESTATION_GRPC_ADDR"` // These two are back-ups for VIN VCs. DevicesAPIGRPCAddr string `yaml:"DEVICES_API_GRPC_ADDR"` diff --git a/internal/constants/constants.go b/internal/constants/constants.go index b83493e..31b5e10 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -37,4 +37,10 @@ var ConnsByAddr = map[common.Address]ConnectionConfig{ Address: common.HexToAddress("0xc4035Fecb1cc906130423EF05f9C20977F643722"), Points: 6000, }, + common.HexToAddress("0x8D8cDB2B26423c8fDbb27321aF20b4659Ce919fD"): { + LegacyID: "359fmv4U6YGP74oM5C4OBvVe0WK", + LegacyVendor: "Kaufmann", + Address: common.HexToAddress("0x8D8cDB2B26423c8fDbb27321aF20b4659Ce919fD"), + Points: 6000, + }, } diff --git a/internal/services/rewards.go b/internal/services/rewards.go index dbbc3e7..8437ec4 100644 --- a/internal/services/rewards.go +++ b/internal/services/rewards.go @@ -3,12 +3,23 @@ package services import ( "context" + "encoding/json" "errors" "fmt" + "math/big" "slices" "time" + pb_att "github.com/DIMO-Network/attestation-api/pkg/grpc" + att_types "github.com/DIMO-Network/attestation-api/pkg/types" + "github.com/DIMO-Network/cloudevent" + pb_devices "github.com/DIMO-Network/devices-api/pkg/grpc" + "google.golang.org/grpc/status" + + pb_fetch "github.com/DIMO-Network/fetch-api/pkg/grpc" + "google.golang.org/grpc/codes" + "github.com/DIMO-Network/rewards-api/internal/config" "github.com/DIMO-Network/rewards-api/internal/constants" "github.com/DIMO-Network/rewards-api/internal/services/ch" @@ -16,7 +27,6 @@ import ( "github.com/DIMO-Network/rewards-api/internal/storage" "github.com/DIMO-Network/rewards-api/models" "github.com/DIMO-Network/rewards-api/pkg/date" - "github.com/DIMO-Network/set" pb_tesla "github.com/DIMO-Network/tesla-oracle/pkg/grpc" "github.com/aarondl/null/v8" "github.com/aarondl/sqlboiler/v4/boil" @@ -25,8 +35,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/rs/zerolog" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/wrapperspb" ) type BaselineClient struct { @@ -39,6 +48,8 @@ type BaselineClient struct { FirstAutomatedWeek int IdentityClient IdentityClient teslaOracle TeslaClient + fetchClient pb_fetch.FetchServiceClient + attClient pb_att.AttestationServiceClient } type IdentityClient interface { @@ -66,6 +77,8 @@ func NewBaselineRewardService( week int, logger *zerolog.Logger, teslaOracle TeslaClient, + attClient pb_att.AttestationServiceClient, + fetchClient pb_fetch.FetchServiceClient, ) *BaselineClient { return &BaselineClient{ TransferService: transferService, @@ -77,6 +90,8 @@ func NewBaselineRewardService( FirstAutomatedWeek: settings.FirstAutomatedWeek, IdentityClient: stakeChecker, teslaOracle: teslaOracle, + attClient: attClient, + fetchClient: fetchClient, } } @@ -87,7 +102,7 @@ func (t *BaselineClient) assignPoints() error { weekStart := date.NumToWeekStart(issuanceWeek) weekEnd := date.NumToWeekEnd(issuanceWeek) - var vinsUsed set.Set[string] + vinUsedBy := make(map[string]int64) t.Logger.Info().Msgf("Running job for issuance week %d, running from %s to %s", issuanceWeek, weekStart.Format(time.RFC3339), weekEnd.Format(time.RFC3339)) @@ -138,7 +153,7 @@ func (t *BaselineClient) assignPoints() error { vd, err := t.IdentityClient.DescribeVehicle(uint64(device.TokenID)) if err != nil { if errors.Is(err, identity.ErrNotFound) { - logger.Info().Msg("Vehicle was active during the week but was later deleted.") + logger.Info().Msg("Vehicle was active during the week but was later burned.") continue } return fmt.Errorf("failed to describe vehicle %d: %w", device.TokenID, err) @@ -153,72 +168,77 @@ func (t *BaselineClient) assignPoints() error { RewardsReceiverEthereumAddress: null.StringFrom(vOwner.Hex()), } - vin := "" - if ad := vd.AftermarketDevice; ad != nil { - conn, ok := constants.ConnsByMfrId[ad.Manufacturer.TokenID] - if ok && slices.Contains(device.Sources, conn.Address.Hex()) { + if conn, ok := constants.ConnsByMfrId[ad.Manufacturer.TokenID]; !ok { + logger.Warn().Msgf("Unrecognized aftermarket device manufacturer %d.", ad.Manufacturer.TokenID) + } else if slices.Contains(device.Sources, conn.Address.Hex()) { thisWeek.RewardsReceiverEthereumAddress = null.StringFrom(ad.Beneficiary.Hex()) - - if vd.Owner != ad.Beneficiary { - logger.Info().Msgf("Sending tokens to beneficiary %s.", ad.Beneficiary) - } - - thisWeek.AftermarketTokenID = types.NewNullDecimal(decimal.New(int64(ad.TokenID), 0)) //new(decimal.Big).SetUint64(uint64(ad.TokenID))) + thisWeek.AftermarketTokenID = types.NewNullDecimal(decimal.New(int64(ad.TokenID), 0)) thisWeek.AftermarketDevicePoints = int(conn.Points) thisWeek.IntegrationIds = append(thisWeek.IntegrationIds, conn.LegacyID) } } if sd := vd.SyntheticDevice; sd != nil { - conn, ok := constants.ConnsByAddr[sd.Connection.Address] - if ok && slices.Contains(device.Sources, conn.Address.Hex()) { + if conn, ok := constants.ConnsByAddr[sd.Connection.Address]; !ok { + logger.Warn().Msgf("Unrecognized synthetic device connection %s.", sd.Connection.Address.Hex()) + } else if slices.Contains(device.Sources, conn.Address.Hex()) { thisWeek.SyntheticDeviceID = null.IntFrom(sd.TokenID) thisWeek.SyntheticDevicePoints = int(conn.Points) thisWeek.IntegrationIds = append(thisWeek.IntegrationIds, conn.LegacyID) - - // Yes, this is bad. Temporary. - if conn.LegacyVendor == "Tesla" { - vti, err := t.teslaOracle.GetVinByTokenId(ctx, &pb_tesla.GetVinByTokenIdRequest{TokenId: uint32(sd.TokenID)}) - if err != nil { - if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { - // logger.Info().Msg("Device was active during the week but was later deleted.") - continue - } - return err - } - vin = vti.Vin - } } - } if len(thisWeek.IntegrationIds) == 0 { - logger.Warn().Msg("All integrations sending signals failed on-chain checks.") + logger.Warn().Msg("All devices sending signals failed onchain checks.") continue } - if vin == "" { - vv, err := t.DevicesClient.GetVehicleByTokenIdFast(ctx, &pb_devices.GetVehicleByTokenIdFastRequest{TokenId: uint32(device.TokenID)}) - if err != nil { - if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { - // logger.Info().Msg("Device was active during the week but was later deleted.") - continue - } - return err - } - if vv.Vin == "" { + if _, err := t.attClient.EnsureVinVc(ctx, &pb_att.EnsureVinVcRequest{ + TokenId: uint32(device.TokenID), + }); err != nil { + // TODO(elffjs): Hard to determined the nature of errors, they're all code = Unknown. + logger.Err(err).Msgf("Failed to ensure VIN attestation.") + continue + } + + ce, err := t.fetchClient.GetLatestCloudEvent(ctx, &pb_fetch.GetLatestCloudEventRequest{ + Options: &pb_fetch.SearchOptions{ + Type: &wrapperspb.StringValue{Value: cloudevent.TypeAttestation}, + DataVersion: &wrapperspb.StringValue{Value: "vin/v1.0"}, + Subject: &wrapperspb.StringValue{Value: cloudevent.ERC721DID{ChainID: 137, ContractAddress: common.HexToAddress("0xbA5738a18d83D41847dfFbDC6101d37C69c9B0cF"), TokenID: big.NewInt(device.TokenID)}.String()}, + }, + }) + if err != nil { + if status.Code(err) == codes.NotFound { + logger.Warn().Msg("No VIN attestation for vehicle.") continue } - vin = vv.Vin + + return fmt.Errorf("failed to get fetch VIN attestation for vehicle %d: %w", device.TokenID, err) } - if vinsUsed.Contains(vin) { - logger.Info().Msg("VIN already used in this rewards period.") + var cred att_types.Credential + if err := json.Unmarshal(ce.CloudEvent.Data, &cred); err != nil { + logger.Err(err).Msg("Couldn't parse VIN attestation data.") + continue + } + + var vs att_types.VINSubject + if err := json.Unmarshal(cred.CredentialSubject, &vs); err != nil { + logger.Err(err).Msg("Couldn't parse VIN attestation subject.") + continue + } + + vin := vs.VehicleIdentificationNumber + logger.Info().Msgf("VIN is %s.", vin) + + if vehicleID, ok := vinUsedBy[vin]; ok { + logger.Warn().Msgf("VIN already used by vehicle %d in this rewards period.", vehicleID) continue } - vinsUsed.Add(vin) + vinUsedBy[vin] = device.TokenID // Streak rewards. streakInput := StreakInput{ @@ -236,7 +256,6 @@ func (t *BaselineClient) assignPoints() error { stakePoints := 0 if vd.Stake != nil && weekEnd.Before(vd.Stake.EndsAt) { stakePoints = vd.Stake.Points - logger.Info().Msgf("Adding %d points from staking.", stakePoints) } setStreakFields(thisWeek, streak, stakePoints) From 78934c294b7d9d61ba0deff56566e2d0b34effc3 Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Sun, 9 Nov 2025 14:17:34 -0500 Subject: [PATCH 2/2] Add an ensure command --- cmd/rewards-api/main.go | 24 ++++++++++++++ internal/services/vinvc/ensure.go | 53 +++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 internal/services/vinvc/ensure.go diff --git a/cmd/rewards-api/main.go b/cmd/rewards-api/main.go index 3cd65ce..519ad18 100644 --- a/cmd/rewards-api/main.go +++ b/cmd/rewards-api/main.go @@ -31,6 +31,7 @@ import ( "github.com/DIMO-Network/rewards-api/internal/services/ch" "github.com/DIMO-Network/rewards-api/internal/services/identity" "github.com/DIMO-Network/rewards-api/internal/services/mobileapi" + "github.com/DIMO-Network/rewards-api/internal/services/vinvc" "github.com/DIMO-Network/rewards-api/pkg/date" pb_rewards "github.com/DIMO-Network/shared/api/rewards" "github.com/DIMO-Network/shared/pkg/db" @@ -197,6 +198,29 @@ func main() { if err := database.MigrateDatabase(logger, &settings.DB, command, "migrations"); err != nil { logger.Fatal().Err(err).Msg("Failed to run migration.") } + case "ensure-vin": + + chClient, err := ch.NewClient(&settings) + if err != nil { + logger.Fatal().Err(err).Msg("Failed to create ClickHouse client.") + } + attConn, err := grpc.NewClient(settings.AttestationGRPCAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + logger.Fatal().Err(err).Msg("Failed to create fetch-api connection.") + } + defer attConn.Close() + + attClient := pb_att.NewAttestationServiceClient(attConn) + client := &vinvc.EnsureClient{ + AttClient: attClient, + CHClient: chClient, + Logger: &logger, + } + err = client.EnsureAll() + if err != nil { + logger.Fatal().Err(err).Msg("Failed to ensure.") + + } case "calculate": var week int if len(os.Args) == 2 { diff --git a/internal/services/vinvc/ensure.go b/internal/services/vinvc/ensure.go new file mode 100644 index 0000000..fe4a2cc --- /dev/null +++ b/internal/services/vinvc/ensure.go @@ -0,0 +1,53 @@ +package vinvc + +import ( + "context" + "fmt" + "time" + + pb "github.com/DIMO-Network/attestation-api/pkg/grpc" + "github.com/DIMO-Network/rewards-api/internal/services/ch" + "github.com/DIMO-Network/rewards-api/pkg/date" + "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" +) + +type EnsureClient struct { + AttClient pb.AttestationServiceClient + CHClient *ch.Client + Logger *zerolog.Logger +} + +func (e *EnsureClient) EnsureAll() error { + weekNum := date.GetWeekNum(time.Now()) + weekStart := date.NumToWeekStart(weekNum) + weekEnd := date.NumToWeekEnd(weekNum) + + e.Logger.Info().Msgf("Ensuring VIN attestations for week %d.", weekNum) + + t := time.Now() + actives, err := e.CHClient.DescribeActiveDevices(context.TODO(), weekStart, weekEnd) + if err != nil { + return fmt.Errorf("failed to list active vehicles: %w", err) + } + e.Logger.Info().Msgf("Activity query took %s. Found %d vehicles.", time.Since(t), len(actives)) + + g, ctx := errgroup.WithContext(context.Background()) + g.SetLimit(10) + + for _, v := range actives { + g.Go(func() error { + _, err := e.AttClient.EnsureVinVc(ctx, &pb.EnsureVinVcRequest{ + TokenId: uint32(v.TokenID), + }) + if err != nil { + e.Logger.Err(err).Int64("vehicleId", v.TokenID).Msg("Failed to ensure VIN attestation for vehicle.") + } + return nil + }) + } + + e.Logger.Info().Msgf("Ensure job finished.") + + return g.Wait() +}