Skip to content

Conversation

aliamerj
Copy link
Collaborator

@aliamerj aliamerj commented Sep 1, 2025

Describe your changes

This PR connects the Job API with the peer job stream and updates both server and client code to work together.

What was done:

  • Integrated Job API with the JobManager stream channel.
  • Updated server logic to send and track jobs through the channel.
  • Implemented client-side handling in the engine for job requests and responses.
  • Adjusted existing code to align with the new unified job flow.

Issue ticket number and link #4354

Stack

Checklist

  • Is it a bug fix
  • Is a typo/documentation fix
  • Is a feature enhancement
  • It is a refactor
  • Created tests that fail without the change (if possible)

By submitting this pull request, you confirm that you have read and agree to the terms of the Contributor License Agreement.

Documentation

Select exactly one:

  • I added/updated documentation for this change
  • Documentation is not needed for this change (explain why)

Docs PR URL (required if "docs added" is checked)

Paste the PR link from https://github.com/netbirdio/docs here:

https://github.com/netbirdio/docs/pull/__

integrate api with stream and implement some client side
@@ -298,7 +298,7 @@ func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.

// blocking until error
err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler)
if err != nil {
if err != nil && err != io.EOF{
Copy link
Collaborator Author

@aliamerj aliamerj Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that when sending a job, I was repeatedly seeing logs from the sync with EFO. To fix this, I updated this not ideal but for now,
the problem is when sending stream request for peer both sync and job triggered and one get valid request and the other get nothing , so we get this error and disconnect the peer , I think we need we to drop the io.EOF check in sync as i did for the job

Comment on lines 914 to 929
case *mgmProto.JobRequest_Bundle:
uploadKey, err := e.handleBundle(params.Bundle)
if err != nil {
return &mgmProto.JobResponse{
ID: msg.ID,
Status: mgmProto.JobStatus_failed,
Reason: []byte(err.Error()),
}
}
return &mgmProto.JobResponse{
ID: msg.ID,
Status: mgmProto.JobStatus_succeeded,
WorkloadResults: &mgmProto.JobResponse_Bundle{
Bundle: &mgmProto.BundleResult{
UploadKey: uploadKey,
},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you are creating unnecessary code duplication. The only generic part is WorkloadResults. The rest is identical and can be build after the switch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generic part are

  1. Status could be succeeded or failed
  2. Reason could be nil or error message
  3. WorkloadResults could be nil or the result
    so the identical part is just job id

Comment on lines 141 to 144
var job types.Job
if err := job.ApplyResponse(jobResponse); err != nil {
return status.Errorf(status.Internal, err.Error())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this conversion to the GRPC layer to not drag proto sturct to deep into the code

Comment on lines 899 to 912
func (e *Engine) getPeerClient() (*grpc.ClientConn, error) {
conn, err := grpc.NewClient(
strings.TrimPrefix(e.daemonAddress, "tcp://"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to daemon error: %v\n"+
"If the daemon is not running please run: "+
"\nnetbird service install \nnetbird service start\n", err)
}

return conn, nil
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the way to go. The engine is part of the daemon so we are basically connecting to ourself.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i understand , that is why i needed help to understand what is the best why to call bundle ,
is passing grpc client and call it from it or passing address string and call the bundle or without the grpc and call the debugging directly ?
and also considering the other os

@@ -112,7 +112,7 @@ func (c *Client) Run(urlOpener URLOpener, dns *DNSList, dnsReadyListener DnsRead

// todo do not throw error in case of cancelled context
ctx = internal.CtxInitState(ctx)
c.connectClient = internal.NewConnectClient(ctx, cfg, c.recorder)
c.connectClient = internal.NewConnectClient(ctx, cfg, c.recorder, "")
Copy link
Collaborator Author

@aliamerj aliamerj Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we need to pass the logFile here

@@ -196,7 +196,8 @@ func runInForegroundMode(ctx context.Context, cmd *cobra.Command, activeProf *pr
r := peer.NewRecorder(config.ManagementURL.String())
r.GetFullStatus()

connectClient := internal.NewConnectClient(ctx, config, r)
//todo: do we need to pass logFile here ?
connectClient := internal.NewConnectClient(ctx, config, r, "")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we need to pass the logFile here

client := internal.NewConnectClient(ctx, c.config, recorder)

//todo: do we need to pass logFile here ?
client := internal.NewConnectClient(ctx, c.config, recorder, "")
Copy link
Collaborator Author

@aliamerj aliamerj Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we need to pass the logFile here

@@ -127,7 +127,8 @@ func (c *Client) Run(fd int32, interfaceName string) error {
c.onHostDnsFn = func([]string) {}
cfg.WgIface = interfaceName

c.connectClient = internal.NewConnectClient(ctx, cfg, c.recorder)
//todo: do we need to pass logFile here ?
c.connectClient = internal.NewConnectClient(ctx, cfg, c.recorder, "")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we need to pass the logFile here

@@ -235,7 +233,7 @@ func (s *Server) connectWithRetryRuns(ctx context.Context, config *profilemanage

runOperation := func() error {
log.Tracef("running client connection")
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder)
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder, s.logFile)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here the only place we we pass the logFile

@aliamerj aliamerj requested a review from pappz September 16, 2025 14:49
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
3 New issues
3 New Code Smells (required ≤ 0)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants