Skip to content

Commit

Permalink
feat: archivistactl import
Browse files Browse the repository at this point in the history
Introduces a new `archivistactl` import

It allows Archivista users to import DSSE Envelopes directly to the
Archivista database.

The feature allows direct import, which can help importing huge amount
of data as it use concurrency (go routines) to process.

Signed-off-by: Kairo Araujo <[email protected]>
  • Loading branch information
kairoaraujo committed Aug 20, 2024
1 parent 898c419 commit d8ff65a
Showing 1 changed file with 198 additions and 0 deletions.
198 changes: 198 additions & 0 deletions cmd/archivistactl/cmd/import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2024 The Archivista Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"sync"

"github.com/edwarnicke/gitoid"
"github.com/in-toto/archivista/ent"
"github.com/in-toto/archivista/pkg/metadatastorage/sqlstore"
"github.com/in-toto/go-witness/dsse"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var (
importCmd = &cobra.Command{
Use: "import",
Short: "import dsses to the Archivista DB server",
SilenceUsage: true,
Args: cobra.MaximumNArgs(2),
RunE: importDsse,
}
)

func init() {
rootCmd.AddCommand(importCmd)
importCmd.PersistentFlags().StringP("from-dir", "", "", "Directory to import from. Example: /path/to/directory")
importCmd.PersistentFlags().StringP("db-uri", "", "", "Database URI to import to. Supported schemes: mysql, psql. Example: mysql://user:password@localhost:3306/testify")
importCmd.PersistentFlags().IntP("max-concurrent", "", 3, "Maximum number of concurrent imports.")
err := importCmd.MarkPersistentFlagRequired("db-uri")
cobra.CheckErr(err)
}

func walkDir(dir string) (<-chan string, <-chan error) {
ch := make(chan string)
errCh := make(chan error, 1)

Check warning on line 57 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L55-L57

Added lines #L55 - L57 were not covered by tests

go func() {
defer close(ch)
defer close(errCh)

Check warning on line 61 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L59-L61

Added lines #L59 - L61 were not covered by tests

err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err

Check warning on line 65 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L63-L65

Added lines #L63 - L65 were not covered by tests
}
if !info.IsDir() {
ch <- path

Check warning on line 68 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L67-L68

Added lines #L67 - L68 were not covered by tests
}
return nil

Check warning on line 70 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L70

Added line #L70 was not covered by tests
})

if err != nil {
errCh <- err

Check warning on line 74 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L73-L74

Added lines #L73 - L74 were not covered by tests
}
}()

return ch, errCh

Check warning on line 78 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L78

Added line #L78 was not covered by tests
}

func dbClient(dbURI string) (*ent.Client, error) {

Check warning on line 81 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L81

Added line #L81 was not covered by tests
// Verify that we can connect to the DB
var (
scheme string
uri string
)

Check warning on line 86 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L83-L86

Added lines #L83 - L86 were not covered by tests

purl, err := url.Parse(dbURI)
if err != nil {
return nil, err

Check warning on line 90 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L88-L90

Added lines #L88 - L90 were not covered by tests
}

switch strings.ToUpper(purl.Scheme) {
case "MYSQL":
scheme = "MYSQL"
uri = purl.User.String() + "@" + purl.Host + purl.Path
case "PSQL":
scheme = "PSQL"
uri = dbURI
default:
return nil, fmt.Errorf("unsupported database scheme %s", purl.Scheme)

Check warning on line 101 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L93-L101

Added lines #L93 - L101 were not covered by tests
}

// TODO:
// Define MaxIdleConns, MaxOpenConns, ConnMaxLifetime as custom parameters
entClient, err := sqlstore.NewEntClient(
scheme,
uri,
)
if err != nil {
return nil, err

Check warning on line 111 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L106-L111

Added lines #L106 - L111 were not covered by tests
}

return entClient, nil

Check warning on line 114 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L114

Added line #L114 was not covered by tests
}

func importFile(path string, sqlStore *sqlstore.Store, maxConcurrent int) {
fpaths, _ := walkDir(path)
var wg sync.WaitGroup
sem := make(chan struct{}, maxConcurrent) // Buffered channel acting as a semaphore

Check warning on line 120 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L117-L120

Added lines #L117 - L120 were not covered by tests

for fpath := range fpaths {
sem <- struct{}{} // Acquire a token before starting a new Goroutine
wg.Add(1)
go func(fpath string) {
defer wg.Done()
defer func() { <-sem }() // Release the token when done

Check warning on line 127 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L122-L127

Added lines #L122 - L127 were not covered by tests

fmt.Println("\nImporting file:", fpath)
file, err := os.ReadFile(fpath)
if err != nil {
fmt.Println("Skipping file: "+fpath+" cannot read file", fpath)
return

Check warning on line 133 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L129-L133

Added lines #L129 - L133 were not covered by tests
}

envelope := &dsse.Envelope{}
if err := json.Unmarshal(file, envelope); err != nil {
fmt.Printf("Skipping file: %s cannot open %s as DSSE Envelope\n", fpath, fpath)
return

Check warning on line 139 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L136-L139

Added lines #L136 - L139 were not covered by tests
}
if envelope.PayloadType != "" {
ngitoid, err := gitoid.New(bytes.NewReader(file), gitoid.WithContentLength(int64(len(file))), gitoid.WithSha256())
if err != nil {
fmt.Println("Skipping file: "+fpath+" cannot generate valid GitOID", fpath)
return

Check warning on line 145 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L141-L145

Added lines #L141 - L145 were not covered by tests
}
err = sqlStore.Store(context.Background(), ngitoid.String(), file)
if err != nil {

Check warning on line 148 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L147-L148

Added lines #L147 - L148 were not covered by tests
// if failed due to duplicate entry, skip
if strings.Contains(err.Error(), "Duplicate entry") {
fmt.Println("Skipping file: " + fpath + " cannot store duplicated entry")
} else {
fmt.Println("Skipping file: "+fpath+" failed to import.", err)

Check warning on line 153 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L150-L153

Added lines #L150 - L153 were not covered by tests
}
return

Check warning on line 155 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L155

Added line #L155 was not covered by tests
}
}
fmt.Println("Successfully imported", fpath)

Check warning on line 158 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L158

Added line #L158 was not covered by tests
}(fpath)
}

// Wait for all Goroutines to finish
wg.Wait()

Check warning on line 163 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L163

Added line #L163 was not covered by tests
}

func importDsse(ccmd *cobra.Command, args []string) error {
logrus.SetLevel(logrus.FatalLevel) // Set the log level to Fatal to suppress lower-level logs

Check warning on line 167 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L166-L167

Added lines #L166 - L167 were not covered by tests

sourceDir, err := ccmd.Flags().GetString("from-dir")
if err != nil {
return err

Check warning on line 171 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L169-L171

Added lines #L169 - L171 were not covered by tests
}

dbURI, err := ccmd.Flags().GetString("db-uri")
if err != nil {
return err

Check warning on line 176 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L174-L176

Added lines #L174 - L176 were not covered by tests
}

ec, err := dbClient(dbURI)
if err != nil {
return err

Check warning on line 181 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L179-L181

Added lines #L179 - L181 were not covered by tests
}

sqlStore, _, err := sqlstore.New(context.Background(), ec)
if err != nil {
return err

Check warning on line 186 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L184-L186

Added lines #L184 - L186 were not covered by tests
}

max, err := ccmd.Flags().GetInt("max-concurrent")
if err != nil {
fmt.Println("Failed to get max-concurrent flag", err)

Check warning on line 191 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L189-L191

Added lines #L189 - L191 were not covered by tests
}
fmt.Print("\nImporting DSSes from folder", sourceDir, " to the database server")
fmt.Print("\nMax concurrent imports: ", max, "\n\n")
importFile(sourceDir, sqlStore, max)

Check warning on line 195 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L193-L195

Added lines #L193 - L195 were not covered by tests

return nil

Check warning on line 197 in cmd/archivistactl/cmd/import.go

View check run for this annotation

Codecov / codecov/patch

cmd/archivistactl/cmd/import.go#L197

Added line #L197 was not covered by tests
}

0 comments on commit d8ff65a

Please sign in to comment.