Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add user storage threshold #975

Open
wants to merge 25 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
aea8308
added user storage threshold function
LucRoy May 8, 2023
1c97dd5
added bytes to TB utilility method
LucRoy May 8, 2023
ec15b85
moving utilization query to own table
LucRoy May 9, 2023
0a960e2
added users storage capacity struct
LucRoy May 9, 2023
2bdd650
added http error for reached threshold limit
LucRoy May 9, 2023
6a8d8df
added limit theshold check in handleAdd
LucRoy May 9, 2023
54d8473
added default for soft / hard limit
LucRoy May 9, 2023
0facfda
added utility methods for calculating body size
LucRoy May 9, 2023
3a0b529
added capacity checks for /add-car, /add-ipfs and /create
LucRoy May 9, 2023
8231f11
added comments for default values
LucRoy May 9, 2023
278a2ea
introduced lastSync for forcing size refresh after 24h
LucRoy May 10, 2023
e494d20
added rest api endpoint for fetching user utilization stats
LucRoy May 10, 2023
6dff853
fixed merge conficts
LucRoy May 10, 2023
1785d6a
fixed merge conficts
LucRoy May 10, 2023
42a3d0e
added better error handling for gosec
LucRoy May 10, 2023
08b9bad
change io.reader to use content length for calculating request size
LucRoy May 11, 2023
1a87d30
added validate only function for pinning op
LucRoy May 11, 2023
aefa166
cleanup storage threshold logic
LucRoy May 17, 2023
21180d2
change user size calculation to addObjectsToDatabase() once pinned
LucRoy May 17, 2023
4de9f20
cleanup comments
LucRoy May 17, 2023
abef007
fixed error handling on no records founds
LucRoy May 17, 2023
693f983
added new db table to migrateSchemas()
LucRoy May 17, 2023
71e1f3a
change threshold field to have _bytes at the end for clarification
LucRoy May 23, 2023
c0a4580
updated cut over date for storage calculation
LucRoy May 23, 2023
3749074
docs: update swagger docs
LucRoy May 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (s *apiV1) RegisterRoutes(e *echo.Echo) {
user.PUT("/password", util.WithUser(s.handleUserChangePassword))
user.PUT("/address", util.WithUser(s.handleUserChangeAddress))
user.GET("/stats", util.WithUser(s.handleGetUserStats))
user.GET("/utilization", util.WithUser(s.handleGetUserUtilization))

userMiner := user.Group("/miner")
userMiner.POST("/claim", util.WithUser(s.handleUserClaimMiner))
Expand Down
78 changes: 78 additions & 0 deletions api/v1/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,20 @@ func (s *apiV1) handleAddCar(c echo.Context, u *util.User) error {
return s.redirectContentAdding(c, u)
}

// Get user storage capacity
usc, err := s.getUserStorageCapacity(u)
if err != nil {
return err
}

if !usc.ValidateThreshold() {
return &util.HttpError{
Code: http.StatusBadRequest,
Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT,
Details: fmt.Sprintf("Reached Max Storage Threshold: %.2f TB. Please contact the Estuary Team for provisionning dedicated infrastruture if additonal storage is needed. We can be reached on the Filecoin slack under the #ecosystem-dev channel.", util.BytesToTB(usc.HardLimit)),
}
}

// if splitting is disabled and uploaded content size is greater than content size limit
// reject the upload, as it will only get stuck and deals will never be made for it
// if !u.FlagSplitContent() {
Expand Down Expand Up @@ -492,6 +506,25 @@ func (s *apiV1) loadCar(ctx context.Context, bs blockstore.Blockstore, r io.Read
return car.LoadCar(ctx, bs, r)
}

func (s *apiV1) getUserStorageCapacity(user *util.User) (*util.UsersStorageCapacity, error) {
var usc *util.UsersStorageCapacity
err := s.db.First(&usc, "user_id = ?", user.ID).Error

if err != nil || usc.IsSyncNeeded() {
var usage util.Utilization
if err := s.db.Raw(`SELECT (SELECT SUM(size) FROM contents where user_id = ? AND created_at >= ? AND NOT aggregate AND active AND deleted_at IS NULL) as total_size`, user.ID, util.CutOverUtilizationDate).
Scan(&usage).Error; err != nil {
return usc, err
}
usc.UserId = user.ID
usc.Size = usage.TotalSize
usc.LastSyncAt = time.Now()
s.db.Save(&usc)
}

return usc, nil
}

// handleAdd godoc
// @Summary Add new content
// @Description This endpoint is used to upload new content.
Expand Down Expand Up @@ -539,6 +572,20 @@ func (s *apiV1) handleAdd(c echo.Context, u *util.User) error {
return err
}

// Get user storage capacity
usc, err := s.getUserStorageCapacity(u)
if err != nil {
return err
}

if !usc.ValidateThreshold() {
return &util.HttpError{
Code: http.StatusBadRequest,
Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT,
Details: fmt.Sprintf("Reached Max Storage Threshold: %.2f TB. Please contact the Estuary Team for provisionning dedicated infrastruture if additonal storage is needed. We can be reached on the Filecoin slack under the #ecosystem-dev channel.", util.BytesToTB(usc.HardLimit)),
}
}

// if splitting is disabled and uploaded content size is greater than content size limit
// reject the upload, as it will only get stuck and deals will never be made for it
if !u.FlagSplitContent() && mpf.Size > s.cfg.Content.MaxSize {
Expand Down Expand Up @@ -2852,6 +2899,23 @@ func (s *apiV1) handleGetUserStats(c echo.Context, u *util.User) error {
return c.JSON(http.StatusOK, stats)
}

// handleGetUserStats godoc
// @Summary Gets User Utilization Stats
// @Description This endpoint is used to get utilization stats for the current user.
// @Tags User
// @Produce json
// @Success 200 {object} string
// @Failure 400 {object} util.HttpError
// @Failure 500 {object} util.HttpError
// @Router /user/utilization [get]
func (s *apiV1) handleGetUserUtilization(c echo.Context, u *util.User) error {
usc, err := s.getUserStorageCapacity(u)
if err != nil {
return err
}
return c.JSON(http.StatusOK, usc)
}

func (s *apiV1) newAuthTokenForUser(user *util.User, expiry time.Time, perms []string, label string, isSession bool) (*util.AuthToken, error) {
if len(perms) > 1 {
return nil, fmt.Errorf("invalid perms")
Expand Down Expand Up @@ -4514,6 +4578,20 @@ func (s *apiV1) handleCreateContent(c echo.Context, u *util.User) error {
return err
}

// Get user storage capacity
usc, err := s.getUserStorageCapacity(u)
if err != nil {
return err
}

if !usc.ValidateThreshold() {
return &util.HttpError{
Code: http.StatusBadRequest,
Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT,
Details: fmt.Sprintf("Reached Max Storage Threshold: %.2f TB. Please contact the Estuary Team for provisionning dedicated infrastruture if additonal storage is needed. We can be reached on the Filecoin slack under the #ecosystem-dev channel.", util.BytesToTB(usc.HardLimit)),
}
}

rootCID, err := cid.Decode(req.Root)
if err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions api/v1/pinning.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,20 @@ func (s *apiV1) handleAddPin(c echo.Context, u *util.User) error {
overwrite = true
}

// Get user storage capacity
usc, err := s.getUserStorageCapacity(u)
if err != nil {
return err
}

if !usc.ValidateThreshold() {
return &util.HttpError{
Code: http.StatusBadRequest,
Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT,
Details: fmt.Sprintf("Reached Max Storage Threshold: %.2f TB. Please contact the Estuary Team for provisionning dedicated infrastruture if additonal storage is needed. We can be reached on the Filecoin slack under the #ecosystem-dev channel.", util.BytesToTB(usc.HardLimit)),
}
}

ignoreDuplicates := false
if c.QueryParam("ignore-dupes") == "true" {
ignoreDuplicates = true
Expand Down
32 changes: 32 additions & 0 deletions docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3291,6 +3291,38 @@ const docTemplate = `{
}
}
},
"/user/utilization": {
"get": {
"description": "This endpoint is used to get utilization stats for the current user.",
"produces": [
"application/json"
],
"tags": [
"User"
],
"summary": "Gets User Utilization Stats",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/util.HttpError"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/util.HttpError"
}
}
}
}
},
"/viewer": {
"get": {
"description": "This endpoint fetches viewer details such as username, permissions, address, owned miners, user settings etc.",
Expand Down
32 changes: 32 additions & 0 deletions docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -3284,6 +3284,38 @@
}
}
},
"/user/utilization": {
"get": {
"description": "This endpoint is used to get utilization stats for the current user.",
"produces": [
"application/json"
],
"tags": [
"User"
],
"summary": "Gets User Utilization Stats",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/util.HttpError"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/util.HttpError"
}
}
}
}
},
"/viewer": {
"get": {
"description": "This endpoint fetches viewer details such as username, permissions, address, owned miners, user settings etc.",
Expand Down
21 changes: 21 additions & 0 deletions docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2549,6 +2549,27 @@ paths:
summary: Get stats for the current user
tags:
- User
/user/utilization:
get:
description: This endpoint is used to get utilization stats for the current user.
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"400":
description: Bad Request
schema:
$ref: '#/definitions/util.HttpError'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/util.HttpError'
summary: Gets User Utilization Stats
tags:
- User
/viewer:
get:
description: This endpoint fetches viewer details such as username, permissions, address, owned miners, user settings etc.
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func migrateSchemas(db *gorm.DB) error {
&model.DealQueueTracker{},
&model.SplitQueue{},
&model.SplitQueueTracker{},
&util.UsersStorageCapacity{},
); err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions pinner/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ func (m *manager) addObjectsToDatabase(ctx context.Context, cont *util.Content,
return xerrors.Errorf("failed to update content in database: %w", err)
}

var usc *util.UsersStorageCapacity
err := tx.First(&usc, "user_id = ?", cont.UserID).Error
if err != nil {
usc.UserId = cont.UserID
usc.Size = 0
}

usc.Size += contSize
tx.Save(&usc)

// if content can be staged, stage it
if contSize < m.cfg.Content.MinSize {
return m.stgZoneQueueMgr.QueueContent(cont, tx, false)
Expand Down
1 change: 1 addition & 0 deletions util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
ERR_CONTENT_ADDING_DISABLED = "ERR_CONTENT_ADDING_DISABLED"
ERR_INVALID_INPUT = "ERR_INVALID_INPUT"
ERR_CONTENT_SIZE_OVER_LIMIT = "ERR_CONTENT_SIZE_OVER_LIMIT"
ERR_USER_REACHED_STORAGE_TRESHOLD = "ERR_USER_REACHED_STORAGE_TRESHOLD"
ERR_PEERING_PEERS_ADD_ERROR = "ERR_PEERING_PEERS_ADD_ERROR"
ERR_PEERING_PEERS_REMOVE_ERROR = "ERR_PEERING_PEERS_REMOVE_ERROR"
ERR_PEERING_PEERS_START_ERROR = "ERR_PEERING_PEERS_START_ERROR"
Expand Down
5 changes: 5 additions & 0 deletions util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,8 @@ func ToMultiAddress(addr string) (multiaddr.Multiaddr, error) {
}
return a, nil
}

func BytesToTB(bytes int64) float64 {
tb := float64(bytes) / (1024 * 1024 * 1024 * 1024)
return tb
}
35 changes: 35 additions & 0 deletions util/users_storage_capacity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package util

import (
"gorm.io/gorm"
"time"
)

type UsersStorageCapacity struct {
gorm.Model

UserId uint `json:"user_id"`
Size int64 `json:"size" gorm:"default:0"`
SoftLimit int64 `json:"soft_limit_bytes" gorm:"default:1319413953331"` // Hardlimit*.8
HardLimit int64 `json:"hard_limit_bytes" gorm:"default:1649267441664"` // 1.5TB
LastSyncAt time.Time `json:"last_sync_at"`
}

type Utilization struct {
TotalSize int64
}

// CutOverUtilizationDate All content uploaded pass this date will count toward your user storage capacity
const CutOverUtilizationDate = "2023-05-26 00:00:00"
const SyncRefreshInHours = 24

func (usc *UsersStorageCapacity) ValidateThreshold() bool {
return usc.Size <= usc.HardLimit
}

func (usc *UsersStorageCapacity) IsSyncNeeded() bool {
now := time.Now().UTC()
duration := now.Sub(usc.LastSyncAt.UTC())
refresh := SyncRefreshInHours * time.Hour
return duration >= refresh
}