Skip to content

Commit 9a8bb78

Browse files
authored
Add download command for functions CLI (#221)
Signed-off-by: xiaolong.ran <[email protected]>
1 parent 16ccacd commit 9a8bb78

File tree

6 files changed

+272
-0
lines changed

6 files changed

+272
-0
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/streamnative/pulsarctl
33
go 1.12
44

55
require (
6+
github.com/davecgh/go-spew v1.1.1
67
github.com/dgrijalva/jwt-go v3.2.0+incompatible
78
github.com/docker/go-connections v0.4.0
89
github.com/fatih/color v1.7.0

pkg/ctl/functions/download.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package functions
19+
20+
import (
21+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
22+
"github.com/streamnative/pulsarctl/pkg/pulsar/common"
23+
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
24+
25+
"github.com/spf13/pflag"
26+
)
27+
28+
func downloadFunctionsCmd(vc *cmdutils.VerbCmd) {
29+
desc := cmdutils.LongDescription{}
30+
desc.CommandUsedFor = "This command is used for download File Data from Pulsar."
31+
desc.CommandPermission = "This command requires super-user permissions."
32+
33+
var examples []cmdutils.Example
34+
download := cmdutils.Example{
35+
Desc: "Download File Data from Pulsar",
36+
Command: "pulsarctl functions download \n" +
37+
"\t--destination-file public\n" +
38+
"\t--path default\n",
39+
}
40+
downloadByNs := cmdutils.Example{
41+
Desc: "Download File Data from Pulsar",
42+
Command: "pulsarctl functions download \n" +
43+
"\t--destination-file public\n" +
44+
"\t--tenant public\n" +
45+
"\t--namespace default\n" +
46+
"\t--name <function-name>\n",
47+
}
48+
examples = append(examples, download, downloadByNs)
49+
desc.CommandExamples = examples
50+
51+
var out []cmdutils.Output
52+
successOut := cmdutils.Output{
53+
Desc: "normal output",
54+
Out: "Downloaded <the name of a Pulsar Function> successfully",
55+
}
56+
failOut := cmdutils.Output{
57+
Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args",
58+
Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)",
59+
}
60+
61+
out = append(out, successOut, failOut)
62+
desc.CommandOutput = out
63+
64+
vc.SetDescription(
65+
"download",
66+
"Download File Data from Pulsar",
67+
desc.ToString(),
68+
desc.ExampleToString(),
69+
"download",
70+
)
71+
72+
functionData := &utils.FunctionData{}
73+
74+
// set the run function
75+
vc.SetRunFunc(func() error {
76+
return doDownloadFunctions(vc, functionData)
77+
})
78+
79+
// register the params
80+
vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) {
81+
flagSet.StringVar(
82+
&functionData.DestinationFile,
83+
"destination-file",
84+
"",
85+
"The file to store downloaded content")
86+
87+
flagSet.StringVar(
88+
&functionData.Path,
89+
"path",
90+
"",
91+
"Path to store the content")
92+
93+
flagSet.StringVar(
94+
&functionData.FQFN,
95+
"fqfn",
96+
"",
97+
"The Fully Qualified Function Name (FQFN) for the function")
98+
99+
flagSet.StringVar(
100+
&functionData.Tenant,
101+
"tenant",
102+
"",
103+
"Tenant name")
104+
105+
flagSet.StringVar(
106+
&functionData.Namespace,
107+
"namespace",
108+
"",
109+
"Namespace name")
110+
111+
flagSet.StringVar(
112+
&functionData.FuncName,
113+
"name",
114+
"",
115+
"Function name")
116+
})
117+
}
118+
119+
func doDownloadFunctions(vc *cmdutils.VerbCmd, funcData *utils.FunctionData) error {
120+
err := processBaseArguments(funcData)
121+
if err != nil {
122+
return err
123+
}
124+
admin := cmdutils.NewPulsarClientWithAPIVersion(common.V3)
125+
126+
if funcData.Path != "" {
127+
err := admin.Functions().DownloadFunction(funcData.DestinationFile, funcData.Path)
128+
if err != nil {
129+
return err
130+
}
131+
} else {
132+
err := admin.Functions().DownloadFunctionByNs(funcData.DestinationFile, funcData.Tenant,
133+
funcData.Namespace, funcData.FuncName)
134+
if err != nil {
135+
return err
136+
}
137+
}
138+
139+
vc.Command.Printf("Downloaded %s successfully\n", funcData.DestinationFile)
140+
return nil
141+
}

pkg/ctl/functions/download_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package functions
19+
20+
import (
21+
"os"
22+
"strings"
23+
"testing"
24+
25+
"github.com/stretchr/testify/assert"
26+
)
27+
28+
func TestDownloadFunctions(t *testing.T) {
29+
jarName := "dummyExample.jar"
30+
_, err := os.Create(jarName)
31+
assert.Nil(t, err)
32+
33+
defer os.Remove(jarName)
34+
35+
args := []string{"create",
36+
"--tenant", "public",
37+
"--namespace", "default",
38+
"--name", "test-functions-download",
39+
"--inputs", "test-input-topic",
40+
"--output", "persistent://public/default/test-output-topic",
41+
"--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction",
42+
"--jar", jarName,
43+
}
44+
45+
_, _, err = TestFunctionsCommands(createFunctionsCmd, args)
46+
assert.Nil(t, err)
47+
48+
downloadArgs := []string{"download",
49+
"--tenant", "public",
50+
"--namespace", "default",
51+
"--name", "test-functions-download",
52+
"--destination-file", "./dummyExample.jar",
53+
}
54+
outPut, execErr, err := TestFunctionsCommands(downloadFunctionsCmd, downloadArgs)
55+
assert.Equal(t, outPut.String(), "Downloaded ./dummyExample.jar successfully\n")
56+
assert.Nil(t, execErr)
57+
assert.Nil(t, err)
58+
}
59+
60+
func TestDownloadFunctionsWithFailure(t *testing.T) {
61+
notExistNameOrFqfnArgs := []string{"download",
62+
"--tenant", "public",
63+
"--namespace", "default",
64+
}
65+
_, execErrMsg, _ := TestFunctionsCommands(downloadFunctionsCmd, notExistNameOrFqfnArgs)
66+
failMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)"
67+
assert.NotNil(t, execErrMsg)
68+
assert.True(t, strings.Contains(execErrMsg.Error(), failMsg))
69+
}

pkg/ctl/functions/functions.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
5454
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, querystateFunctionsCmd)
5555
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, putstateFunctionsCmd)
5656
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, triggerFunctionsCmd)
57+
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, downloadFunctionsCmd)
5758

5859
return resourceCmd
5960
}

pkg/pulsar/functions.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,24 @@ type Functions interface {
5757
// DeleteFunction delete an existing function
5858
DeleteFunction(tenant, namespace, name string) error
5959

60+
// Download Function Code
61+
// @param destinationFile
62+
// file where data should be downloaded to
63+
// @param path
64+
// Path where data is located
65+
DownloadFunction(path, destinationFile string) error
66+
67+
// Download Function Code
68+
// @param destinationFile
69+
// file where data should be downloaded to
70+
// @param tenant
71+
// Tenant name
72+
// @param namespace
73+
// Namespace name
74+
// @param function
75+
// Function name
76+
DownloadFunctionByNs(destinationFile, tenant, namespace, function string) error
77+
6078
// StartFunction start all function instances
6179
StartFunction(tenant, namespace, name string) error
6280

@@ -260,6 +278,46 @@ func (f *functions) DeleteFunction(tenant, namespace, name string) error {
260278
return f.pulsar.Client.Delete(endpoint)
261279
}
262280

281+
func (f *functions) DownloadFunction(path, destinationFile string) error {
282+
endpoint := f.pulsar.endpoint(f.basePath, "download")
283+
_, err := os.Open(destinationFile)
284+
if err != nil {
285+
_, err = os.Create(destinationFile)
286+
if err != nil {
287+
return err
288+
}
289+
}
290+
291+
tmpMap := make(map[string]string)
292+
tmpMap["path"] = path
293+
294+
_, err = f.pulsar.Client.GetWithQueryParams(endpoint, nil, tmpMap, false)
295+
296+
if err != nil {
297+
return err
298+
}
299+
300+
return nil
301+
}
302+
303+
func (f *functions) DownloadFunctionByNs(destinationFile, tenant, namespace, function string) error {
304+
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, function, "download")
305+
_, err := os.Open(destinationFile)
306+
if err != nil {
307+
_, err = os.Create(destinationFile)
308+
if err != nil {
309+
return err
310+
}
311+
}
312+
313+
err = f.pulsar.Client.Get(endpoint, nil)
314+
if err != nil {
315+
return err
316+
}
317+
318+
return nil
319+
}
320+
263321
func (f *functions) StartFunction(tenant, namespace, name string) error {
264322
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name)
265323
return f.pulsar.Client.Post(endpoint+"/start", "")

pkg/pulsar/utils/data.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ type FunctionData struct {
7373
FunctionConfigFile string `json:"fnConfigFile"`
7474
ProcessingGuarantees string `json:"processingGuarantees"`
7575
UserConfig string `json:"userConfigString"`
76+
DestinationFile string `json:"destinationFile"`
77+
Path string `json:"path"`
7678
FuncConf *FunctionConfig `json:"-"`
7779
}
7880

0 commit comments

Comments
 (0)