Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nicksanford committed Oct 17, 2024
1 parent 40cdc6d commit 215ebb8
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 19 deletions.
2 changes: 2 additions & 0 deletions services/datamanager/builtin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Config struct {
ScheduledSyncDisabled bool `json:"sync_disabled"`
SelectiveSyncerName string `json:"selective_syncer_name"`
SyncIntervalMins float64 `json:"sync_interval_mins"`
Flag bool `json:"flag"`
}

// Validate returns components which will be depended upon weakly due to the above matcher.
Expand Down Expand Up @@ -119,6 +120,7 @@ func (c *Config) syncConfig(syncSensor sensor.Sensor, syncSensorEnabled bool, lo
}

return datasync.Config{
Flag: c.Flag,
AdditionalSyncPaths: c.AdditionalSyncPaths,
Tags: c.Tags,
CaptureDir: c.getCaptureDir(),
Expand Down
1 change: 1 addition & 0 deletions services/datamanager/builtin/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

// Config is the sync config from builtin.
type Config struct {
Flag bool
// AdditionalSyncPaths defines the file system paths
// that should be synced in addition to the CaptureDir.
// Generally 3rd party programs will write arbitrary
Expand Down
2 changes: 1 addition & 1 deletion services/datamanager/builtin/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging
retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) {
msg := "error uploading data capture file %s, size: %s, md: %s"
errMetadata := fmt.Sprintf(msg, captureFile.GetPath(), data.FormatBytesI64(captureFile.Size()), captureFile.ReadMetadata())
bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, logger)
bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, s.config.Flag, logger)
if err != nil {
return 0, errors.Wrap(err, errMetadata)
}
Expand Down
135 changes: 117 additions & 18 deletions services/datamanager/builtin/sync/upload_data_capture_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sync
import (
"context"
"fmt"
"io"

"github.com/docker/go-units"
"github.com/go-viper/mapstructure/v2"
Expand All @@ -25,34 +26,32 @@ var MaxUnaryFileSize = int64(units.MB)
// uses StreamingDataCaptureUpload API so as to not exceed the unary response size.
// Otherwise, uploads data over DataCaptureUpload API.
// Note: the bytes size returned is the size of the input file. It only returns a non 0 value in the success case.
func uploadDataCaptureFile(ctx context.Context, f *data.CaptureFile, conn cloudConn, logger logging.Logger) (uint64, error) {
func uploadDataCaptureFile(ctx context.Context, f *data.CaptureFile, conn cloudConn, flag bool, logger logging.Logger) (uint64, error) {
logger.Debugf("preparing to upload data capture file: %s, size: %d", f.GetPath(), f.Size())

md := f.ReadMetadata()

// camera.GetImages is a special case. For that API we make 2 binary data upload requests
if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && md.GetMethodName() == data.GetImages {
return uint64(f.Size()), uploadGetImages(ctx, conn, md, f, logger)
}

metaData := uploadMetadata(conn.partID, md, md.GetFileExtension())
if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && flag {
return uint64(f.Size()), uploadChunkedBinaryData(ctx, conn.client, metaData, f, logger)
}

sensorData, err := data.SensorDataFromCaptureFile(f)
if err != nil {
return 0, errors.Wrap(err, "error reading sensor data")
}

// Do not attempt to upload a file without any sensor readings.
if len(sensorData) == 0 {
logger.Warnf("ignoring and deleting empty capture file without syncing it: %s", f.GetPath())
// log here as this will delete a .capture file without uploading it and without moving it to the failed directory
return 0, nil
}

if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && len(sensorData) > 1 {
return 0, fmt.Errorf("binary sensor data file with more than one sensor reading is not supported: %s", f.GetPath())
}

// camera.GetImages is a special case. For that API we make 2 binary data upload requests
if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && md.GetMethodName() == data.GetImages {
logger.Debugf("attemping to upload camera.GetImages data: %s", f.GetPath())

return uint64(f.Size()), uploadGetImages(ctx, conn, md, sensorData[0], f.Size(), f.GetPath(), logger)
}

metaData := uploadMetadata(conn.partID, md, md.GetFileExtension())
return uint64(f.Size()), uploadSensorData(ctx, conn.client, metaData, sensorData, f.Size(), f.GetPath(), logger)
}

Expand All @@ -73,11 +72,26 @@ func uploadGetImages(
ctx context.Context,
conn cloudConn,
md *v1.DataCaptureMetadata,
sd *v1.SensorData,
size int64,
path string,
f *data.CaptureFile,
logger logging.Logger,
) error {
logger.Debugf("attemping to upload camera.GetImages data: %s", f.GetPath())

sensorData, err := data.SensorDataFromCaptureFile(f)
if err != nil {
return errors.Wrap(err, "error reading sensor data")
}

if len(sensorData) == 0 {
logger.Warnf("ignoring and deleting empty capture file without syncing it: %s", f.GetPath())
// log here as this will delete a .capture file without uploading it and without moving it to the failed directory
return nil
}

if len(sensorData) > 1 {
return fmt.Errorf("binary sensor data file with more than one sensor reading is not supported: %s", f.GetPath())
}
sd := sensorData[0]
var res pb.GetImagesResponse
if err := mapstructure.Decode(sd.GetStruct().AsMap(), &res); err != nil {
return errors.Wrap(err, "failed to decode camera.GetImagesResponse")
Expand All @@ -100,7 +114,7 @@ func uploadGetImages(
metadata := uploadMetadata(conn.partID, md, getFileExtFromImageFormat(img.GetFormat()))
// TODO: This is wrong as the size describes the size of the entire GetImages response, but we are only
// uploading one of the 2 images in that response here.
if err := uploadSensorData(ctx, conn.client, metadata, newSensorData, size, path, logger); err != nil {
if err := uploadSensorData(ctx, conn.client, metadata, newSensorData, f.Size(), f.GetPath(), logger); err != nil {
return errors.Wrapf(err, "failed uploading GetImages image index: %d", i)
}
}
Expand All @@ -123,6 +137,45 @@ func getImagesTimestamps(res *pb.GetImagesResponse, sensorData *v1.SensorData) (
return timeRequested, timeReceived
}

func uploadChunkedBinaryData(
ctx context.Context,
client v1.DataSyncServiceClient,
uploadMD *v1.UploadMetadata,
f *data.CaptureFile,
logger logging.Logger,
) error {
// If it's a large binary file, we need to upload it in chunks.
logger.Debugf("attempting to upload large binary file using StreamingDataCaptureUpload, file: %s", f.GetPath())
var smd v1.SensorMetadata
r, err := f.BinaryReader(&smd)
if err != nil {
return err
}
c, err := client.StreamingDataCaptureUpload(ctx)
if err != nil {
return errors.Wrap(err, "error creating StreamingDataCaptureUpload client")
}

// First send metadata.
streamMD := &v1.StreamingDataCaptureUploadRequest_Metadata{
Metadata: &v1.DataCaptureUploadMetadata{
UploadMetadata: uploadMD,
SensorMetadata: &smd,
},
}
if err := c.Send(&v1.StreamingDataCaptureUploadRequest{UploadPacket: streamMD}); err != nil {
return errors.Wrap(err, "StreamingDataCaptureUpload failed sending metadata")
}

// Then call the function to send the rest.
if err := sendChunkedStreamingDCRequests(ctx, c, r, f.GetPath(), logger); err != nil {
return errors.Wrap(err, "StreamingDataCaptureUpload failed to sync")
}

_, err = c.CloseAndRecv()
return errors.Wrap(err, "StreamingDataCaptureUpload CloseAndRecv failed")
}

func uploadSensorData(
ctx context.Context,
client v1.DataSyncServiceClient,
Expand Down Expand Up @@ -171,6 +224,52 @@ func uploadSensorData(
return errors.Wrap(err, "DataCaptureUpload failed")
}

func sendChunkedStreamingDCRequests(
ctx context.Context,
stream v1.DataSyncService_StreamingDataCaptureUploadClient,
r io.Reader,
path string,
logger logging.Logger,
) error {
chunk := make([]byte, UploadChunkSize)
// Loop until there is no more content to send.
chunkCount := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
n, errRead := r.Read(chunk)
if n > 0 {
// if there is data, send it
// Build request with contents.
uploadReq := &v1.StreamingDataCaptureUploadRequest{
UploadPacket: &v1.StreamingDataCaptureUploadRequest_Data{
Data: chunk[:n],
},
}

// Send request
logger.Debugf("datasync.StreamingDataCaptureUpload sending chunk %d for file: %s", chunkCount, path)
if errSend := stream.Send(uploadReq); errSend != nil {
return errSend
}
}

// if we reached the end of the file return nil err (success)
if errors.Is(errRead, io.EOF) {
return nil
}

// if Read hit an unexpected error, return the error
if errRead != nil {
return errRead
}
chunkCount++
}
}
}

func sendStreamingDCRequests(
ctx context.Context,
stream v1.DataSyncService_StreamingDataCaptureUploadClient,
Expand Down

0 comments on commit 215ebb8

Please sign in to comment.