Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): support run .ts serverless #935

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
// defaultSFNFile is the default serverless file name
const (
defaultSFNSourceFile = "app.go"
defaultSFNSourceTSFile = "app.ts"
defaultSFNTestSourceFile = "app_test.go"
defaultSFNCompliedFile = "sfn.yomo"
defaultSFNWASIFile = "sfn.wasm"
Expand Down
28 changes: 8 additions & 20 deletions cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cli

import (
"os"
"path/filepath"

"github.com/spf13/cobra"
"github.com/yomorun/yomo/pkg/log"
Expand All @@ -27,6 +26,7 @@ import (
_ "github.com/yomorun/yomo/cli/serverless/deno"
_ "github.com/yomorun/yomo/cli/serverless/exec"
_ "github.com/yomorun/yomo/cli/serverless/golang"
_ "github.com/yomorun/yomo/cli/serverless/nodejs"
_ "github.com/yomorun/yomo/cli/serverless/wasm"
"github.com/yomorun/yomo/cli/viper"
)
Expand All @@ -37,7 +37,7 @@ var runCmd = &cobra.Command{
Short: "Run a YoMo Stream Function",
Long: "Run a YoMo Stream Function",
Run: func(cmd *cobra.Command, args []string) {
if err := parseFileArg(args, &opts, defaultSFNCompliedFile, defaultSFNWASIFile, defaultSFNSourceFile); err != nil {
if err := parseFileArg(args, &opts, defaultSFNCompliedFile, defaultSFNWASIFile, defaultSFNSourceFile, defaultSFNSourceTSFile); err != nil {
log.FailureStatusEvent(os.Stdout, err.Error())
return
}
Expand Down Expand Up @@ -68,24 +68,12 @@ var runCmd = &cobra.Command{
)
return
}
// build if it's go file
if ext := filepath.Ext(opts.Filename); ext == ".go" {
log.PendingStatusEvent(os.Stdout, "Building YoMo Stream Function instance...")
if err := s.Build(true); err != nil {
log.FailureStatusEvent(os.Stdout, err.Error())
os.Exit(127)
}
log.SuccessStatusEvent(os.Stdout, "YoMo Stream Function build successful!")
}
// run
// wasi
if ext := filepath.Ext(opts.Filename); ext == ".wasm" {
wasmRuntime := opts.Runtime
if wasmRuntime == "" {
wasmRuntime = "wazero"
}
log.InfoStatusEvent(os.Stdout, "WASM runtime: %s", wasmRuntime)

if err := s.Build(true); err != nil {
log.FailureStatusEvent(os.Stdout, err.Error())
os.Exit(127)
}

log.InfoStatusEvent(
os.Stdout,
"Starting YoMo Stream Function instance, connecting to zipper: %v",
Expand All @@ -103,7 +91,7 @@ func init() {
rootCmd.AddCommand(runCmd)

runCmd.Flags().StringVarP(&opts.ZipperAddr, "zipper", "z", "localhost:9000", "YoMo-Zipper endpoint addr")
runCmd.Flags().StringVarP(&opts.Name, "name", "n", "app", "yomo stream function name.")
runCmd.Flags().StringVarP(&opts.Name, "name", "n", "", "yomo stream function name.")
runCmd.Flags().StringVarP(&opts.ModFile, "modfile", "m", "", "custom go.mod")
runCmd.Flags().StringVarP(&opts.Credential, "credential", "d", "", "client credential payload, eg: `token:dBbBiRE7`")
runCmd.Flags().StringVarP(&opts.Runtime, "runtime", "r", "", "serverless runtime type")
Expand Down
2 changes: 1 addition & 1 deletion cli/serverless/deno/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ func (s *denoServerless) Executable() bool {
}

func init() {
serverless.Register(&denoServerless{}, ".js", ".ts")
serverless.Register(&denoServerless{}, ".js")
}
3 changes: 2 additions & 1 deletion cli/serverless/golang/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (s *GolangServerless) Init(opts *serverless.Options) error {

// Build compiles the serverless to executable
func (s *GolangServerless) Build(clean bool) error {
log.PendingStatusEvent(os.Stdout, "Building YoMo Stream Function instance...")
// check if the file exists
appPath := s.source
if _, err := os.Stat(appPath); os.IsNotExist(err) {
Expand Down Expand Up @@ -183,7 +184,6 @@ func (s *GolangServerless) Build(clean bool) error {
err = fmt.Errorf("Build: go mod tidy err %s", out)
return err
}
// build
// wasi
dir, _ := filepath.Split(s.opts.Filename)
filename := "sfn.yomo"
Expand Down Expand Up @@ -212,6 +212,7 @@ func (s *GolangServerless) Build(clean bool) error {
if clean {
file.Remove(s.tempDir)
}
log.SuccessStatusEvent(os.Stdout, "YoMo Stream Function build successful!")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cli/serverless/golang/templates/main.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func run(_ *cobra.Command, _ []string) {

func init() {
rootCmd.Flags().StringVarP(&zipper, "zipper", "z", "localhost:9000", "YoMo-Zipper endpoint addr")
rootCmd.Flags().StringVarP(&name, "name", "n", "app", "yomo stream function name")
rootCmd.Flags().StringVarP(&name, "name", "n", "", "yomo stream function name")
rootCmd.Flags().StringVarP(&credential, "credential", "d", "", "client credential payload, eg: `token:dBbBiRE7`")
viper.SetEnvPrefix("YOMO_SFN")
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
Expand Down
2 changes: 1 addition & 1 deletion cli/serverless/golang/templates/main_rx.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func run(_ *cobra.Command, _ []string) {

func init() {
rootCmd.Flags().StringVarP(&zipper, "zipper", "z", "localhost:9000", "YoMo-Zipper endpoint addr")
rootCmd.Flags().StringVarP(&name, "name", "n", "app", "yomo stream function name")
rootCmd.Flags().StringVarP(&name, "name", "n", "", "yomo stream function name")
rootCmd.Flags().StringVarP(&credential, "credential", "d", "", "client credential payload, eg: `token:dBbBiRE7`")
viper.SetEnvPrefix("YOMO_SFN")
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
Expand Down
144 changes: 144 additions & 0 deletions cli/serverless/nodejs/runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package nodejs

import (
"errors"
"fmt"
"html/template"
"os"
"os/exec"
"path/filepath"

_ "embed"
)

//go:embed templates/wrapper_ts.tmpl
var WrapperTSTmpl string

var (
wrapperTS = ".wrapper.ts"
wrapperJS = ".wrapper.js"
)

// NodejsWrapper is the nodejs implementation of wrapper.
type NodejsWrapper struct {
functionName string
workDir string // eg. src/
entryTSFile string // eg. src/app.ts
entryJSFile string // eg. src/app.js
fileName string // eg. src/app

// command path
nodePath string
npmPath string
}

// NewWrapper returns a new NodejsWrapper
func NewWrapper(functionName, entryTSFile string) (*NodejsWrapper, error) {
// check command
nodePath, err := exec.LookPath("node")
if err != nil {
return nil, errors.New("[node] command was not found. to run the sfn in ts, you need to install node. For details, visit https://nodejs.org")
}
npmPath, err := exec.LookPath("pnpm")
if err != nil {
npmPath, _ = exec.LookPath("npm")
}

ext := filepath.Ext(entryTSFile)
if ext != ".ts" {
return nil, fmt.Errorf("only support typescript, got: %s", entryTSFile)
}
workdir := filepath.Dir(entryTSFile)

entryJSFile := entryTSFile[:len(entryTSFile)-len(ext)] + ".js"

fileName := entryTSFile[:len(entryTSFile)-len(filepath.Ext(entryTSFile))]

w := &NodejsWrapper{
functionName: functionName,
workDir: workdir,
entryTSFile: entryTSFile,
entryJSFile: entryJSFile,
fileName: fileName,
nodePath: nodePath,
npmPath: npmPath,
}

return w, nil
}

// WorkDir returns the working directory of the serverless function to build and run.
func (w *NodejsWrapper) WorkDir() string {
return w.workDir
}

// Build defines how to build the serverless function.
func (w *NodejsWrapper) Build(env []string) error {
// 1. generate .wrapper.ts file
dstPath := filepath.Join(w.workDir, wrapperTS)
_ = os.Remove(dstPath)

if err := w.genWrapperTS(w.functionName, dstPath); err != nil {
return err
}

// 2. install dependencies
cmd := exec.Command(w.npmPath, "install")
cmd.Dir = w.workDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = env
if err := cmd.Run(); err != nil {
return err
}

// 3. compile .wrapper.ts file to .wrapper.js
cmd2 := exec.Command("tsc", wrapperTS)
cmd2.Dir = w.workDir
cmd2.Stdout = os.Stdout
cmd2.Stderr = os.Stderr
cmd2.Env = env
if err := cmd2.Run(); err != nil {
return err
}

return nil
}

// Run runs the serverless function
func (w *NodejsWrapper) Run(env []string) error {
cmd := exec.Command(w.nodePath, wrapperJS)
cmd.Dir = w.workDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = env

return cmd.Run()
}

func (w *NodejsWrapper) genWrapperTS(functionName, dstPath string) error {
data := struct {
WorkDir string
FunctionName string
FileName string
FilePath string
}{
WorkDir: w.workDir,
FunctionName: functionName,
FileName: w.fileName,
FilePath: w.entryTSFile,
}

dst, err := os.OpenFile(dstPath, os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
return err
}
defer dst.Close()

t := template.Must(template.New("wrapper").Parse(WrapperTSTmpl))
if err := t.Execute(dst, data); err != nil {
return err
}

return nil
}
59 changes: 59 additions & 0 deletions cli/serverless/nodejs/serverless.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Package nodejs provides a ts serverless runtime
package nodejs

import (
"os"

"github.com/yomorun/yomo/cli/serverless"
"github.com/yomorun/yomo/pkg/wrapper"
)

// nodejsServerless will start js program to run serverless functions.
type nodejsServerless struct {
name string
zipperAddr string
credential string
wrapper *NodejsWrapper
}

// Init initializes the nodejs serverless
func (s *nodejsServerless) Init(opts *serverless.Options) error {
wrapper, err := NewWrapper(opts.Name, opts.Filename)
if err != nil {
return err
}

s.name = opts.Name
s.zipperAddr = opts.ZipperAddr
s.credential = opts.Credential
s.wrapper = wrapper

return nil
}

// Build calls wrapper.Build
func (s *nodejsServerless) Build(_ bool) error {
return s.wrapper.Build(os.Environ())
}

// Run the wrapper.Run
func (s *nodejsServerless) Run(verbose bool) error {
err := serverless.LoadEnvFile(s.wrapper.workDir)
if err != nil {
return err
}
env := os.Environ()
if verbose {
env = append(env, "YOMO_LOG_LEVEL=debug")
}
return wrapper.Run(s.name, s.zipperAddr, s.credential, s.wrapper, env)
}

// Executable shows whether the program needs to be built
func (s *nodejsServerless) Executable() bool {
return true
}

func init() {
serverless.Register(&nodejsServerless{}, ".ts")
}
62 changes: 62 additions & 0 deletions cli/serverless/nodejs/templates/wrapper_ts.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { join } from 'path'
import {
_genTools,
_readSFNData,
_writeSFNData,
_writeSFNHeader,
_FunctionCall,
_createConnection,
withTimeout,
} from '@yomo/sfn'
import { description, tag, handler } from '{{ .FileName }}'

const WORK_DIR = '{{ .WorkDir }}'
const FUNCTION_NAME = '{{ .FunctionName }}'
const SFN_FILE_PATH = '{{ .FilePath }}'
const SOCK_PATH = join(WORK_DIR, 'sfn.sock');
const REDUCE_TAG = 0xe001;

const TIMEOUT_DURATION = 1000 * 60

function run() {
if (
!description ||
!handler ||
tag === undefined ||
tag === null
) {
throw Error('description, tags, handler signature must be exported!')
}
const tools = _genTools(FUNCTION_NAME, description, SFN_FILE_PATH)
const header = JSON.stringify({
tags: [tag],
function_definition: JSON.stringify(tools, null, 2)
})
const { conn } = _createConnection(SOCK_PATH, {
onConnect: () => {
_writeSFNHeader(conn, header)
},
onData: async (buf: Buffer) => {
const { tag, data } = _readSFNData(buf)
const fc = new _FunctionCall(data)
const args = fc.readLLMArguments()
let result: any
if (typeof handler === 'function') {
try {
result = await withTimeout(handler(args), TIMEOUT_DURATION)
} catch (error) {
if (error instanceof Error && error.message === 'timeout') {
fc.writeLLMResult('timeout in this function calling, you should ignore this.', false)
_writeSFNData(conn, REDUCE_TAG, JSON.stringify(fc.data))
}
return
}
}
if (!result) return
fc.writeLLMResult(JSON.stringify(result))
_writeSFNData(conn, REDUCE_TAG, JSON.stringify(fc.data))
}
})
}

run()
Loading