Skip to content

Commit

Permalink
feat: Add context usage to libmirror push operations (#62)
Browse files Browse the repository at this point in the history
* feat: add context usage to libmirror push operations

Signed-off-by: Maxim Vasilenko <[email protected]>

---------

Signed-off-by: Maxim Vasilenko <[email protected]>
  • Loading branch information
mvasl authored Nov 25, 2024
1 parent 5ba39bb commit 183a751
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 58 deletions.
7 changes: 6 additions & 1 deletion internal/mirror/cmd/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pull

import (
"bufio"
"context"
"crypto/md5"
"fmt"
"io"
Expand Down Expand Up @@ -162,16 +163,20 @@ func pull(_ *cobra.Command, _ []string) error {
patch := mirrorCtx.SpecificVersion.Patch()
accessValidationTag = fmt.Sprintf("v%d.%d.%d", major, minor, patch)
}
if err := auth.ValidateReadAccessForImage(
readAccessTimeoutCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
if err := auth.ValidateReadAccessForImageContext(
readAccessTimeoutCtx,
mirrorCtx.DeckhouseRegistryRepo+":"+accessValidationTag,
mirrorCtx.RegistryAuth,
mirrorCtx.Insecure,
mirrorCtx.SkipTLSVerification,
); err != nil {
cancel()
if os.Getenv("MIRROR_BYPASS_ACCESS_CHECKS") != "1" {
return fmt.Errorf("Source registry access validation failure: %w", err)
}
}
cancel()

var versionsToMirror []semver.Version
var err error
Expand Down
17 changes: 17 additions & 0 deletions pkg/libmirror/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package bundle
import (
"archive/tar"
"bufio"
"context"
"errors"
"fmt"
"io"
Expand All @@ -32,13 +33,25 @@ import (
)

func Unpack(mirrorCtx *contexts.BaseContext) error {
return UnpackContext(context.Background(), mirrorCtx)
}

func UnpackContext(ctx context.Context, mirrorCtx *contexts.BaseContext) error {
if err := ctx.Err(); err != nil {
return err
}

bundleDir := filepath.Dir(mirrorCtx.BundlePath)
catalog, err := os.ReadDir(bundleDir)
if err != nil {
return fmt.Errorf("read tar bundle directory: %w", err)
}
streams := make([]io.Reader, 0)
for _, entry := range catalog {
if err = ctx.Err(); err != nil {
return err
}

fileName := entry.Name()
if !entry.Type().IsRegular() || filepath.Ext(fileName) != ".chunk" {
continue
Expand All @@ -61,6 +74,10 @@ func Unpack(mirrorCtx *contexts.BaseContext) error {

tarReader := tar.NewReader(bundleStream)
for {
if err = ctx.Err(); err != nil {
return err
}

tarHdr, err := tarReader.Next()
if errors.Is(err, io.EOF) {
break
Expand Down
5 changes: 3 additions & 2 deletions pkg/libmirror/layouts/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package layouts

import (
"context"
"fmt"
"path"
"strings"
Expand Down Expand Up @@ -184,8 +185,8 @@ func PullImageSet(
err = retry.RunTask(
pullCtx.Logger,
fmt.Sprintf("[%d / %d] Pulling %s ", pullCount, totalCount, imageReferenceString),
task.WithConstantRetries(5, 10*time.Second, func() error {
img, err := remote.Image(ref, remoteOpts...)
task.WithConstantRetries(5, 10*time.Second, func(ctx context.Context) error {
img, err := remote.Image(ref, append(remoteOpts, remote.WithContext(ctx))...)
if err != nil {
if errorutil.IsImageNotFoundError(err) && pullOpts.allowMissingTags {
pullCtx.Logger.WarnLn("⚠️ Not found in registry, skipping pull")
Expand Down
91 changes: 60 additions & 31 deletions pkg/libmirror/layouts/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ limitations under the License.
package layouts

import (
"context"
"errors"
"fmt"
"os"
"sync"
"time"

"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/layout"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/hashicorp/go-multierror"
"github.com/samber/lo"
"github.com/samber/lo/parallel"

Expand All @@ -46,6 +48,27 @@ func PushLayoutToRepo(
logger contexts.Logger,
parallelismConfig contexts.ParallelismConfig,
insecure, skipVerifyTLS bool,
) error {
return PushLayoutToRepoContext(
context.Background(),
imagesLayout,
registryRepo,
authProvider,
logger,
parallelismConfig,
insecure,
skipVerifyTLS,
)
}

func PushLayoutToRepoContext(
ctx context.Context,
imagesLayout layout.Path,
registryRepo string,
authProvider authn.Authenticator,
logger contexts.Logger,
parallelismConfig contexts.ParallelismConfig,
insecure, skipVerifyTLS bool,
) error {
refOpts, remoteOpts := auth.MakeRemoteRegistryRequestOptions(authProvider, insecure, skipVerifyTLS)
if parallelismConfig.Blobs != 0 {
Expand Down Expand Up @@ -73,7 +96,9 @@ func PushLayoutToRepo(
tag := manifestSet[0].Annotations["io.deckhouse.image.short_tag"]
imageRef := registryRepo + ":" + tag
logger.InfoF("[%d / %d] Pushing image %s", imagesCount, len(indexManifest.Manifests), imageRef)
pushImage(logger, registryRepo, index, imagesCount, refOpts, remoteOpts)(manifestSet[0], 0)
if err = pushImage(ctx, registryRepo, index, manifestSet[0], refOpts, remoteOpts); err != nil {
return fmt.Errorf("Push Image: %w", err)
}
imagesCount += 1
continue
}
Expand All @@ -84,9 +109,17 @@ func PushLayoutToRepo(
logger.InfoF("- %s", registryRepo+":"+manifest.Annotations["io.deckhouse.image.short_tag"])
}

parallel.ForEach(manifestSet, pushImage(logger, registryRepo, index, imagesCount, refOpts, remoteOpts))
errMu := &sync.Mutex{}
merr := &multierror.Error{}
parallel.ForEach(manifestSet, func(item v1.Descriptor, i int) {
if err = pushImage(ctx, registryRepo, index, item, refOpts, remoteOpts); err != nil {
errMu.Lock()
defer errMu.Unlock()
merr = multierror.Append(merr, err)
}
})

return nil
return merr.ErrorOrNil()
})
if err != nil {
return fmt.Errorf("Push batch of images: %w", err)
Expand All @@ -99,44 +132,40 @@ func PushLayoutToRepo(
}

func pushImage(
logger contexts.Logger,
ctx context.Context,
registryRepo string,
index v1.ImageIndex,
imagesCount int,
manifest v1.Descriptor,
refOpts []name.Option,
remoteOpts []remote.Option,
) func(v1.Descriptor, int) {
return func(manifest v1.Descriptor, _ int) {
tag := manifest.Annotations["io.deckhouse.image.short_tag"]
imageRef := registryRepo + ":" + tag
img, err := index.Image(manifest.Digest)
if err != nil {
logger.WarnF("Read image: %v", err)
os.Exit(1)
}
ref, err := name.ParseReference(imageRef, refOpts...)
if err != nil {
logger.WarnF("Parse image reference: %v", err)
os.Exit(1)
}
) error {
tag := manifest.Annotations["io.deckhouse.image.short_tag"]
imageRef := registryRepo + ":" + tag
img, err := index.Image(manifest.Digest)
if err != nil {
return fmt.Errorf("Read image: %v", err)
}
ref, err := name.ParseReference(imageRef, refOpts...)
if err != nil {
return fmt.Errorf("Parse image reference: %v", err)
}

err = retry.RunTask(silentLogger{}, "", task.WithConstantRetries(19, 3*time.Second, func() error {
if err = remote.Write(ref, img, remoteOpts...); err != nil {
err = retry.RunTaskWithContext(
ctx, silentLogger{}, "push",
task.WithConstantRetries(4, 3*time.Second, func(ctx context.Context) error {
if err = remote.Write(ref, img, append(remoteOpts, remote.WithContext(ctx))...); err != nil {
if errorutil.IsTrivyMediaTypeNotAllowedError(err) {
logger.WarnLn(errorutil.CustomTrivyMediaTypesWarning)
os.Exit(1)
return fmt.Errorf(errorutil.CustomTrivyMediaTypesWarning)
}
return fmt.Errorf("Write %s to registry: %w", ref.String(), err)
}
return nil
}))
if err != nil {
logger.WarnF("Push image: %v", err)
os.Exit(1)
}

imagesCount += 1
}),
)
if err != nil {
return fmt.Errorf("Run push task: %v", err)
}
return nil
}

type silentLogger struct{}
Expand Down
33 changes: 23 additions & 10 deletions pkg/libmirror/operations/push.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package operations

import (
"context"
"errors"
"fmt"
"io/fs"
Expand All @@ -19,17 +20,21 @@ import (
)

func PushDeckhouseToRegistry(mirrorCtx *contexts.PushContext) error {
return PushDeckhouseToRegistryContext(context.Background(), mirrorCtx)
}

func PushDeckhouseToRegistryContext(ctx context.Context, mirrorCtx *contexts.PushContext) error {
logger := mirrorCtx.Logger
logger.InfoF("Looking for Deckhouse images to push")
ociLayouts, modulesList, err := findLayoutsToPush(mirrorCtx)
ociLayouts, modulesList, err := findLayoutsToPush(ctx, mirrorCtx)
if err != nil {
return fmt.Errorf("Find OCI Image Layouts to push: %w", err)
}

for repo, ociLayout := range ociLayouts {
logger.InfoLn("Mirroring", repo)
err = layouts.PushLayoutToRepo(
ociLayout, repo,
err = layouts.PushLayoutToRepoContext(
ctx, ociLayout, repo,
mirrorCtx.RegistryAuth,
mirrorCtx.Logger,
mirrorCtx.Parallelism,
Expand All @@ -54,22 +59,22 @@ func PushDeckhouseToRegistry(mirrorCtx *contexts.PushContext) error {
}

logger.InfoLn("Pushing modules tags")
if err = pushModulesTags(&mirrorCtx.BaseContext, modulesList); err != nil {
if err = pushModulesTags(ctx, &mirrorCtx.BaseContext, modulesList); err != nil {
return fmt.Errorf("Push modules tags: %w", err)
}
logger.InfoF("All modules tags are pushed")

return nil
}

func pushModulesTags(mirrorCtx *contexts.BaseContext, modulesList []string) error {
func pushModulesTags(ctx context.Context, mirrorCtx *contexts.BaseContext, modulesList []string) error {
if len(modulesList) == 0 {
return nil
}

logger := mirrorCtx.Logger

refOpts, remoteOpts := auth.MakeRemoteRegistryRequestOptionsFromMirrorContext(mirrorCtx)
remoteOpts = append(remoteOpts, remote.WithContext(ctx))
modulesRepo := path.Join(mirrorCtx.RegistryHost, mirrorCtx.RegistryPath, "modules")
pushCount := 1
for _, moduleName := range modulesList {
Expand All @@ -93,7 +98,7 @@ func pushModulesTags(mirrorCtx *contexts.BaseContext, modulesList []string) erro
return nil
}

func findLayoutsToPush(mirrorCtx *contexts.PushContext) (map[string]layout.Path, []string, error) {
func findLayoutsToPush(ctx context.Context, mirrorCtx *contexts.PushContext) (map[string]layout.Path, []string, error) {
ociLayouts := make(map[string]layout.Path)
bundlePaths := [][]string{
{""}, // Root contains main deckhouse repo
Expand All @@ -107,6 +112,10 @@ func findLayoutsToPush(mirrorCtx *contexts.PushContext) (map[string]layout.Path,
}

for _, bundlePath := range bundlePaths {
if err := ctx.Err(); err != nil {
return nil, nil, err
}

indexRef := path.Join(append([]string{mirrorCtx.RegistryHost + mirrorCtx.RegistryPath}, bundlePath...)...)
layoutFileSystemPath := filepath.Join(append([]string{mirrorCtx.UnpackedImagesPath}, bundlePath...)...)
l, err := layout.FromPath(layoutFileSystemPath)
Expand All @@ -126,12 +135,16 @@ func findLayoutsToPush(mirrorCtx *contexts.PushContext) (map[string]layout.Path,
return nil, nil, err
}

for _, dir := range dirs {
if !dir.IsDir() {
for _, dirEntry := range dirs {
if err = ctx.Err(); err != nil {
return nil, nil, err
}

if !dirEntry.IsDir() {
continue
}

moduleName := dir.Name()
moduleName := dirEntry.Name()
modulesNames = append(modulesNames, moduleName)
moduleRef := path.Join(mirrorCtx.RegistryHost+mirrorCtx.RegistryPath, "modules", moduleName)
moduleReleasesRef := path.Join(mirrorCtx.RegistryHost+mirrorCtx.RegistryPath, "modules", moduleName, "release")
Expand Down
22 changes: 19 additions & 3 deletions pkg/libmirror/util/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"crypto/tls"
"fmt"
"time"

"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
Expand All @@ -32,14 +31,21 @@ import (
)

func ValidateReadAccessForImage(imageTag string, authProvider authn.Authenticator, insecure, skipVerifyTLS bool) error {
return ValidateReadAccessForImageContext(context.Background(), imageTag, authProvider, insecure, skipVerifyTLS)
}

func ValidateReadAccessForImageContext(
ctx context.Context,
imageTag string,
authProvider authn.Authenticator,
insecure, skipVerifyTLS bool,
) error {
nameOpts, remoteOpts := MakeRemoteRegistryRequestOptions(authProvider, insecure, skipVerifyTLS)
ref, err := name.ParseReference(imageTag, nameOpts...)
if err != nil {
return fmt.Errorf("Parse registry address: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
remoteOpts = append(remoteOpts, remote.WithContext(ctx))
_, err = remote.Head(ref, remoteOpts...)
if err != nil {
Expand All @@ -50,7 +56,17 @@ func ValidateReadAccessForImage(imageTag string, authProvider authn.Authenticato
}

func ValidateWriteAccessForRepo(repo string, authProvider authn.Authenticator, insecure, skipVerifyTLS bool) error {
return ValidateWriteAccessForRepoContext(context.Background(), repo, authProvider, insecure, skipVerifyTLS)
}

func ValidateWriteAccessForRepoContext(
ctx context.Context,
repo string,
authProvider authn.Authenticator,
insecure, skipVerifyTLS bool,
) error {
nameOpts, remoteOpts := MakeRemoteRegistryRequestOptions(authProvider, insecure, skipVerifyTLS)
remoteOpts = append(remoteOpts, remote.WithContext(ctx))
ref, err := name.NewTag(repo+":d8WriteCheck", nameOpts...)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 183a751

Please sign in to comment.