Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion cmd/rewards-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@ import (

_ "github.com/lib/pq"

pb_att "github.com/DIMO-Network/attestation-api/pkg/grpc"
pb_devices "github.com/DIMO-Network/devices-api/pkg/grpc"
pb_fetch "github.com/DIMO-Network/fetch-api/pkg/grpc"

_ "github.com/DIMO-Network/rewards-api/docs"
"github.com/DIMO-Network/rewards-api/internal/api"
"github.com/DIMO-Network/rewards-api/internal/config"
"github.com/DIMO-Network/rewards-api/internal/controllers"

pb_tesla "github.com/DIMO-Network/tesla-oracle/pkg/grpc"

"github.com/DIMO-Network/rewards-api/internal/database"
"github.com/DIMO-Network/rewards-api/internal/services"
"github.com/DIMO-Network/rewards-api/internal/services/ch"
"github.com/DIMO-Network/rewards-api/internal/services/identity"
"github.com/DIMO-Network/rewards-api/internal/services/mobileapi"
"github.com/DIMO-Network/rewards-api/internal/services/vinvc"
"github.com/DIMO-Network/rewards-api/pkg/date"
pb_rewards "github.com/DIMO-Network/shared/api/rewards"
"github.com/DIMO-Network/shared/pkg/db"
Expand Down Expand Up @@ -193,6 +198,29 @@ func main() {
if err := database.MigrateDatabase(logger, &settings.DB, command, "migrations"); err != nil {
logger.Fatal().Err(err).Msg("Failed to run migration.")
}
case "ensure-vin":

chClient, err := ch.NewClient(&settings)
if err != nil {
logger.Fatal().Err(err).Msg("Failed to create ClickHouse client.")
}
attConn, err := grpc.NewClient(settings.AttestationGRPCAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logger.Fatal().Err(err).Msg("Failed to create fetch-api connection.")
}
defer attConn.Close()

attClient := pb_att.NewAttestationServiceClient(attConn)
client := &vinvc.EnsureClient{
AttClient: attClient,
CHClient: chClient,
Logger: &logger,
}
err = client.EnsureAll()
if err != nil {
logger.Fatal().Err(err).Msg("Failed to ensure.")

}
case "calculate":
var week int
if len(os.Args) == 2 {
Expand Down Expand Up @@ -253,7 +281,23 @@ func main() {

teslaClient := pb_tesla.NewTeslaOracleClient(teslaConn)

baselineRewardClient := services.NewBaselineRewardService(&settings, transferService, chClient, deviceClient, identClient, week, &logger, teslaClient)
attConn, err := grpc.NewClient(settings.AttestationGRPCAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logger.Fatal().Err(err).Msg("Failed to create fetch-api connection.")
}
defer attConn.Close()

attClient := pb_att.NewAttestationServiceClient(attConn)

fetchConn, err := grpc.NewClient(settings.FetchAPIGRPCAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logger.Fatal().Err(err).Msg("Failed to create devices-api connection.")
}
defer fetchConn.Close()

fetchClient := pb_fetch.NewFetchServiceClient(fetchConn)

baselineRewardClient := services.NewBaselineRewardService(&settings, transferService, chClient, deviceClient, identClient, week, &logger, teslaClient, attClient, fetchClient)

if err := baselineRewardClient.BaselineIssuance(); err != nil {
logger.Fatal().Err(err).Int("issuanceWeek", week).Msg("Failed to calculate and/or transfer rewards.")
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ replace github.com/ericlagergren/decimal => github.com/ericlagergren/decimal v0.

require (
github.com/ClickHouse/clickhouse-go/v2 v2.40.3
github.com/DIMO-Network/attestation-api v0.1.9
github.com/DIMO-Network/attestation-api v0.1.10
github.com/DIMO-Network/clickhouse-infra v0.0.7
github.com/DIMO-Network/cloudevent v0.1.4
github.com/DIMO-Network/devices-api v1.41.8
github.com/DIMO-Network/fetch-api v0.0.14
github.com/DIMO-Network/model-garage v0.7.4
github.com/DIMO-Network/set v0.0.0-20250627202730-1145b5cbaecb
github.com/DIMO-Network/shared v1.0.7
github.com/DIMO-Network/tesla-oracle v0.4.18
github.com/IBM/sarama v1.46.1
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/ClickHouse/clickhouse-go/v2 v2.40.3 h1:46jB4kKwVDUOnECpStKMVXxvR0Cg9z
github.com/ClickHouse/clickhouse-go/v2 v2.40.3/go.mod h1:qO0HwvjCnTB4BPL/k6EE3l4d9f/uF+aoimAhJX70eKA=
github.com/DATA-DOG/go-sqlmock v1.4.1 h1:ThlnYciV1iM/V0OSF/dtkqWb6xo5qITT1TJBG1MRDJM=
github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DIMO-Network/attestation-api v0.1.9 h1:bVm/cX4PBlpVPZFGzHzbtOvB0yRUHOto4swGocvRHlE=
github.com/DIMO-Network/attestation-api v0.1.9/go.mod h1:ivVn5+vDK5/hhNuL48Wi8M8BdeNpIapHqIPgJjIbzZY=
github.com/DIMO-Network/attestation-api v0.1.10 h1:HdKGLlYTjuoX/m2srRmkk+K5aYT6pm4aHz4nL0u+hi8=
github.com/DIMO-Network/attestation-api v0.1.10/go.mod h1:3R46KVOFa7WmrxR99aX68dZw2Kl6ffle1yFCSx0gtNg=
github.com/DIMO-Network/clickhouse-infra v0.0.7 h1:TAsjkFFKu3D5Xg6dwBcRBryjCVSlXsNjVbTwJ4UDlTg=
github.com/DIMO-Network/clickhouse-infra v0.0.7/go.mod h1:XS80lhSJNWBWGgZ+m4j7++zFj1wAXfmtV2gJfhGlabQ=
github.com/DIMO-Network/cloudevent v0.1.4 h1:c6Sq4CyHt05V8OtnEXekUCRGfVuR1pFkJevfiKt1sYM=
Expand All @@ -22,8 +22,6 @@ github.com/DIMO-Network/fetch-api v0.0.14 h1:Pzl/q88gt78DPeXyeDhq0eFp7yIE4ZvuNi8
github.com/DIMO-Network/fetch-api v0.0.14/go.mod h1:88wlGSCliybeZ+XJyJkL8Cf/d8CTm8vQeB8obJNr3tY=
github.com/DIMO-Network/model-garage v0.7.4 h1:mFnGPPMxOWexeJe7LmykLlwIr5JgCC5C0giIssmZRio=
github.com/DIMO-Network/model-garage v0.7.4/go.mod h1:xtHRSvILRrC99CGavoaUFQT+hOxbghbTy5MYzT2KTtM=
github.com/DIMO-Network/set v0.0.0-20250627202730-1145b5cbaecb h1:oT0gCBPGBsGwO49UdCEYaCdDWrxfwk8AvKL+KzlY/ZY=
github.com/DIMO-Network/set v0.0.0-20250627202730-1145b5cbaecb/go.mod h1:rNyMOJr8tcQzIkGK/9LAbLNXBzVHtTYxqStwVh+T+ws=
github.com/DIMO-Network/shared v1.0.7 h1:LfSgsqJ6R7EUyfo2GTfuhrCpoDcweJqe7eVOa4j7Xbo=
github.com/DIMO-Network/shared v1.0.7/go.mod h1:lDHUKwwT2LW6Zvd42Nb33dXklRNTmfyOlbUNx2dQfGY=
github.com/DIMO-Network/tesla-oracle v0.4.18 h1:VRbdwWTJRv95NV9Ut/FfFAAGt9/cyF5mDe+xnGPY2vA=
Expand Down
1 change: 1 addition & 0 deletions internal/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Settings struct {
MobileAPIBaseURL string `yaml:"MOBILE_API_BASE_URL"`
StorageNodeDevLicense common.Address `yaml:"STORAGE_NODE_DEV_LICENSE"`
VINVCConcurrencyLimit int `yaml:"VINVC_CONCURRENCY_LIMIT"`
AttestationGRPCAddr string `yaml:"ATTESTATION_GRPC_ADDR"`

// These two are back-ups for VIN VCs.
DevicesAPIGRPCAddr string `yaml:"DEVICES_API_GRPC_ADDR"`
Expand Down
6 changes: 6 additions & 0 deletions internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,10 @@ var ConnsByAddr = map[common.Address]ConnectionConfig{
Address: common.HexToAddress("0xc4035Fecb1cc906130423EF05f9C20977F643722"),
Points: 6000,
},
common.HexToAddress("0x8D8cDB2B26423c8fDbb27321aF20b4659Ce919fD"): {
LegacyID: "359fmv4U6YGP74oM5C4OBvVe0WK",
LegacyVendor: "Kaufmann",
Address: common.HexToAddress("0x8D8cDB2B26423c8fDbb27321aF20b4659Ce919fD"),
Points: 6000,
},
}
113 changes: 66 additions & 47 deletions internal/services/rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,30 @@ package services

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"slices"
"time"

pb_att "github.com/DIMO-Network/attestation-api/pkg/grpc"
att_types "github.com/DIMO-Network/attestation-api/pkg/types"
"github.com/DIMO-Network/cloudevent"

pb_devices "github.com/DIMO-Network/devices-api/pkg/grpc"
"google.golang.org/grpc/status"

pb_fetch "github.com/DIMO-Network/fetch-api/pkg/grpc"
"google.golang.org/grpc/codes"

"github.com/DIMO-Network/rewards-api/internal/config"
"github.com/DIMO-Network/rewards-api/internal/constants"
"github.com/DIMO-Network/rewards-api/internal/services/ch"
"github.com/DIMO-Network/rewards-api/internal/services/identity"
"github.com/DIMO-Network/rewards-api/internal/storage"
"github.com/DIMO-Network/rewards-api/models"
"github.com/DIMO-Network/rewards-api/pkg/date"
"github.com/DIMO-Network/set"
pb_tesla "github.com/DIMO-Network/tesla-oracle/pkg/grpc"
"github.com/aarondl/null/v8"
"github.com/aarondl/sqlboiler/v4/boil"
Expand All @@ -25,8 +35,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
)

type BaselineClient struct {
Expand All @@ -39,6 +48,8 @@ type BaselineClient struct {
FirstAutomatedWeek int
IdentityClient IdentityClient
teslaOracle TeslaClient
fetchClient pb_fetch.FetchServiceClient
attClient pb_att.AttestationServiceClient
}

type IdentityClient interface {
Expand Down Expand Up @@ -66,6 +77,8 @@ func NewBaselineRewardService(
week int,
logger *zerolog.Logger,
teslaOracle TeslaClient,
attClient pb_att.AttestationServiceClient,
fetchClient pb_fetch.FetchServiceClient,
) *BaselineClient {
return &BaselineClient{
TransferService: transferService,
Expand All @@ -77,6 +90,8 @@ func NewBaselineRewardService(
FirstAutomatedWeek: settings.FirstAutomatedWeek,
IdentityClient: stakeChecker,
teslaOracle: teslaOracle,
attClient: attClient,
fetchClient: fetchClient,
}
}

Expand All @@ -87,7 +102,7 @@ func (t *BaselineClient) assignPoints() error {
weekStart := date.NumToWeekStart(issuanceWeek)
weekEnd := date.NumToWeekEnd(issuanceWeek)

var vinsUsed set.Set[string]
vinUsedBy := make(map[string]int64)

t.Logger.Info().Msgf("Running job for issuance week %d, running from %s to %s", issuanceWeek, weekStart.Format(time.RFC3339), weekEnd.Format(time.RFC3339))

Expand Down Expand Up @@ -138,7 +153,7 @@ func (t *BaselineClient) assignPoints() error {
vd, err := t.IdentityClient.DescribeVehicle(uint64(device.TokenID))
if err != nil {
if errors.Is(err, identity.ErrNotFound) {
logger.Info().Msg("Vehicle was active during the week but was later deleted.")
logger.Info().Msg("Vehicle was active during the week but was later burned.")
continue
}
return fmt.Errorf("failed to describe vehicle %d: %w", device.TokenID, err)
Expand All @@ -153,72 +168,77 @@ func (t *BaselineClient) assignPoints() error {
RewardsReceiverEthereumAddress: null.StringFrom(vOwner.Hex()),
}

vin := ""

if ad := vd.AftermarketDevice; ad != nil {
conn, ok := constants.ConnsByMfrId[ad.Manufacturer.TokenID]
if ok && slices.Contains(device.Sources, conn.Address.Hex()) {
if conn, ok := constants.ConnsByMfrId[ad.Manufacturer.TokenID]; !ok {
logger.Warn().Msgf("Unrecognized aftermarket device manufacturer %d.", ad.Manufacturer.TokenID)
} else if slices.Contains(device.Sources, conn.Address.Hex()) {
thisWeek.RewardsReceiverEthereumAddress = null.StringFrom(ad.Beneficiary.Hex())

if vd.Owner != ad.Beneficiary {
logger.Info().Msgf("Sending tokens to beneficiary %s.", ad.Beneficiary)
}

thisWeek.AftermarketTokenID = types.NewNullDecimal(decimal.New(int64(ad.TokenID), 0)) //new(decimal.Big).SetUint64(uint64(ad.TokenID)))
thisWeek.AftermarketTokenID = types.NewNullDecimal(decimal.New(int64(ad.TokenID), 0))
thisWeek.AftermarketDevicePoints = int(conn.Points)
thisWeek.IntegrationIds = append(thisWeek.IntegrationIds, conn.LegacyID)
}
}

if sd := vd.SyntheticDevice; sd != nil {
conn, ok := constants.ConnsByAddr[sd.Connection.Address]
if ok && slices.Contains(device.Sources, conn.Address.Hex()) {
if conn, ok := constants.ConnsByAddr[sd.Connection.Address]; !ok {
logger.Warn().Msgf("Unrecognized synthetic device connection %s.", sd.Connection.Address.Hex())
} else if slices.Contains(device.Sources, conn.Address.Hex()) {
thisWeek.SyntheticDeviceID = null.IntFrom(sd.TokenID)
thisWeek.SyntheticDevicePoints = int(conn.Points)
thisWeek.IntegrationIds = append(thisWeek.IntegrationIds, conn.LegacyID)

// Yes, this is bad. Temporary.
if conn.LegacyVendor == "Tesla" {
vti, err := t.teslaOracle.GetVinByTokenId(ctx, &pb_tesla.GetVinByTokenIdRequest{TokenId: uint32(sd.TokenID)})
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
// logger.Info().Msg("Device was active during the week but was later deleted.")
continue
}
return err
}
vin = vti.Vin
}
}

}

if len(thisWeek.IntegrationIds) == 0 {
logger.Warn().Msg("All integrations sending signals failed on-chain checks.")
logger.Warn().Msg("All devices sending signals failed onchain checks.")
continue
}

if vin == "" {
vv, err := t.DevicesClient.GetVehicleByTokenIdFast(ctx, &pb_devices.GetVehicleByTokenIdFastRequest{TokenId: uint32(device.TokenID)})
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
// logger.Info().Msg("Device was active during the week but was later deleted.")
continue
}
return err
}
if vv.Vin == "" {
if _, err := t.attClient.EnsureVinVc(ctx, &pb_att.EnsureVinVcRequest{
TokenId: uint32(device.TokenID),
}); err != nil {
// TODO(elffjs): Hard to determined the nature of errors, they're all code = Unknown.
logger.Err(err).Msgf("Failed to ensure VIN attestation.")
continue
}

ce, err := t.fetchClient.GetLatestCloudEvent(ctx, &pb_fetch.GetLatestCloudEventRequest{
Options: &pb_fetch.SearchOptions{
Type: &wrapperspb.StringValue{Value: cloudevent.TypeAttestation},
DataVersion: &wrapperspb.StringValue{Value: "vin/v1.0"},
Subject: &wrapperspb.StringValue{Value: cloudevent.ERC721DID{ChainID: 137, ContractAddress: common.HexToAddress("0xbA5738a18d83D41847dfFbDC6101d37C69c9B0cF"), TokenID: big.NewInt(device.TokenID)}.String()},
},
})
if err != nil {
if status.Code(err) == codes.NotFound {
logger.Warn().Msg("No VIN attestation for vehicle.")
continue
}
vin = vv.Vin

return fmt.Errorf("failed to get fetch VIN attestation for vehicle %d: %w", device.TokenID, err)
}

if vinsUsed.Contains(vin) {
logger.Info().Msg("VIN already used in this rewards period.")
var cred att_types.Credential
if err := json.Unmarshal(ce.CloudEvent.Data, &cred); err != nil {
logger.Err(err).Msg("Couldn't parse VIN attestation data.")
continue
}

var vs att_types.VINSubject
if err := json.Unmarshal(cred.CredentialSubject, &vs); err != nil {
logger.Err(err).Msg("Couldn't parse VIN attestation subject.")
continue
}

vin := vs.VehicleIdentificationNumber
logger.Info().Msgf("VIN is %s.", vin)

if vehicleID, ok := vinUsedBy[vin]; ok {
logger.Warn().Msgf("VIN already used by vehicle %d in this rewards period.", vehicleID)
continue
}

vinsUsed.Add(vin)
vinUsedBy[vin] = device.TokenID

// Streak rewards.
streakInput := StreakInput{
Expand All @@ -236,7 +256,6 @@ func (t *BaselineClient) assignPoints() error {
stakePoints := 0
if vd.Stake != nil && weekEnd.Before(vd.Stake.EndsAt) {
stakePoints = vd.Stake.Points
logger.Info().Msgf("Adding %d points from staking.", stakePoints)
}

setStreakFields(thisWeek, streak, stakePoints)
Expand Down
Loading
Loading