Skip to content

Commit

Permalink
support named-pipe-ipc for external process
Browse files Browse the repository at this point in the history
  • Loading branch information
whiteCcinn committed May 20, 2021
1 parent 87c9d90 commit 315fa6d
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 33 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ example/daemon
example/daemon_recover
example/daemon_signal
example/*.log
example/named-pipe-ipc.go
example/golang.pipe*
go.sum
.idea
.idea
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func main() {
- [daemon](https://github.com/whiteCcinn/daemon/blob/main/example/daemon.go)
- [daemon-recover](https://github.com/whiteCcinn/daemon/blob/main/example/daemon_recover.go)
- [daemon-signal](https://github.com/whiteCcinn/daemon/blob/main/example/daemon_signal.go)
- [named-pipe-ipc](https://github.com/whiteCcinn/daemon/blob/main/example/named-pipe-ipc.go)

## Log

Expand Down Expand Up @@ -123,6 +124,10 @@ main.main()
2021/05/19 07:51:05 sigterm
[supervisor(1775)] [stop heart -pid 1782] [safety exit]
2021/05/19 07:51:09 1782 end...
root@87ced9181ef6:/www/example# ./named-pipe-ipc
2021/05/20 10:22:32 count:3/2; errNum:0/0
```

## Terminal
Expand Down
7 changes: 7 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package daemon

import "fmt"

func (dctx *Context) Information() string{
return fmt.Sprintf("count:%d/%d; errNum:%d/%d", dctx.Count, dctx.MaxCount, dctx.ErrNum, dctx.MaxError)
}
125 changes: 94 additions & 31 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"encoding/json"
"fmt"
"github.com/erikdubbelboer/gspt"
"github.com/whiteCcinn/named-pipe-ipc"
"io"
"log"
"os"
"os/exec"
"strconv"
"sync"
"syscall"
"time"
)
Expand Down Expand Up @@ -65,11 +67,22 @@ type Context struct {

*exec.Cmd
ExtraFiles []*os.File
Pid int // supervisor pid
CPid int // main pid
// supervisor pid
Pid int
// main pid
CPid int
// start count
Count int
// start error number
ErrNum int

// Restart after callback
RestartCallback

namedPipeCtx *named_pipe_ipc.Context

noNamedPipeOnce sync.Once
namedPipeOnce sync.Once
}

type RestartCallback func(ctx context.Context)
Expand Down Expand Up @@ -170,21 +183,20 @@ func (dctx *Context) Run(ctx context.Context) error {
log.Fatal(err)
}

count := 1
dctx.Count = 1
isReset := false
errNum := 0
dctx.ErrNum = 0
for {
//daemon information
dInfo := fmt.Sprintf("count:%d/%d; errNum:%d/%d", count, dctx.MaxCount, errNum, dctx.MaxError)
if errNum > dctx.MaxError {
if dctx.ErrNum > dctx.MaxError {
dctx.log("[supervisor(%d)] [child process fails too many times]\n", dctx.Pid)
os.Exit(1)
}
if dctx.MaxCount > 0 && count > dctx.MaxCount {
if dctx.MaxCount > 0 && dctx.Count > dctx.MaxCount {
dctx.log("[supervisor(%d)] [reboot too many times quit]\n", dctx.Pid)
os.Exit(0)
}
count++
dctx.Count++

r, w, err := os.Pipe()
if err != nil {
Expand Down Expand Up @@ -212,7 +224,7 @@ func (dctx *Context) Run(ctx context.Context) error {
cmd, err := Background(ctx, dctx, WithNoExit())
if err != nil {
dctx.log("[supervisor(%d)] [child process start failed, err: %s]\n", dctx.Pid, err)
errNum++
dctx.ErrNum++
continue
}

Expand All @@ -239,31 +251,82 @@ func (dctx *Context) Run(ctx context.Context) error {

// supervisor process
gspt.SetProcTitle(fmt.Sprintf("heart -pid %d", dctx.CPid))
if count > 2 || isReset {
if dctx.Count > 2 || isReset {
if dctx.RestartCallback != nil {
dctx.RestartCallback(ctx)
}
}

// read from child process
go func() {
for {
var data PipeMessage
decoder := json.NewDecoder(r)
if err := decoder.Decode(&data); err != nil {
log.Printf("decode r, err: %v", err)
break
}
if data.Type != ProcessToSupervisor {
continue
}

if data.Behavior == WantSafetyClose {
dctx.log("[supervisor(%d)] [stop heart -pid %d] [safety exit]\n", dctx.Pid, dctx.CPid)
os.Exit(0)
dctx.noNamedPipeOnce.Do(func() {
dctx.log("[supervisor(%d)] [no-named-pipe-ipc] [listen]\n", dctx.Pid)
go func() {
for {
var data PipeMessage
decoder := json.NewDecoder(r)
if err := decoder.Decode(&data); err != nil {
log.Printf("decode r, err: %v", err)
break
}
if data.Type != ProcessToSupervisor {
continue
}

if data.Behavior == WantSafetyClose {
dctx.log("[supervisor(%d)] [stop heart -pid %d] [safety exit]\n", dctx.Pid, dctx.CPid)
os.Exit(0)
}
}
}()
})

// named-pipe-ipc
dctx.namedPipeOnce.Do(func() {
dctx.namedPipeCtx, err = named_pipe_ipc.NewContext(context.Background(), "./", named_pipe_ipc.S)
if err != nil {
log.Fatal(err)
}
}()
dctx.log("[supervisor(%d)] [named-pipe-ipc] [listen]\n", dctx.Pid)
go func() {
go func() {
for {
msg, err := dctx.namedPipeCtx.Recv(false, '\n')
if err != nil && err.Error() != named_pipe_ipc.NoMessageMessage {
dctx.log("[supervisor(%d)] [named-pipe-ipc] [err:%v]\n", dctx.Pid, err)
os.Exit(4)
}

if msg == nil {
if ctx.Err() != nil {
return
}
time.Sleep(500 * time.Millisecond)
continue
}

var epm NamedPipeMessage
err = json.Unmarshal(msg, &epm)
if err != nil {
dctx.log("[supervisor(%d)] [named-pipe-ipc] [err:%v]\n", dctx.Pid, err)
}

if epm.Api == PrintInformation {
ret := dctx.Information()
_, err = dctx.namedPipeCtx.Send(named_pipe_ipc.Message(ret + "\n"))
if err != nil {
dctx.log("[supervisor(%d)] [named-pipe-ipc] [send-error:%v]\n", dctx.Pid, err)
}
}
}
}()

err = dctx.namedPipeCtx.Listen('\n')
if err != nil {
dctx.log("[supervisor(%d)] [named-pipe-ipc start failed] [err:%v]\n", dctx.Pid, err)
os.Exit(3)
}
}()
})

// parent process wait child process exit
err = cmd.Wait()
Expand All @@ -272,20 +335,20 @@ func (dctx *Context) Run(ctx context.Context) error {

// start slow
if cost < dctx.MinExitTime {
errNum++
dctx.ErrNum++
} else {
errNum = 0
dctx.ErrNum = 0
}

if dctx.RestoreTime > 0 && cost > dctx.RestoreTime {
isReset = true
count = 1
dctx.Count = 1
}

if err != nil {
dctx.log("[supervisor(%d)] [%s] [heart -pid=%d exit] [%d-worked %v] [err: %v]\n", dctx.Pid, dInfo, dctx.CPid, dctx.CPid, cost, err)
dctx.log("[supervisor(%d)] [%s] [heart -pid=%d exit] [%d-worked %v] [err: %v]\n", dctx.Pid, dctx.Information(), dctx.CPid, dctx.CPid, cost, err)
} else {
dctx.log("[supervisor(%d)] [%s] [heart -pid=%d exit] [%d-worked %v]\n", dctx.Pid, dInfo, dctx.CPid, dctx.CPid, cost)
dctx.log("[supervisor(%d)] [%s] [heart -pid=%d exit] [%d-worked %v]\n", dctx.Pid, dctx.Information(), dctx.CPid, dctx.CPid, cost)
}
}

Expand Down
Binary file added example/named-pipe-ipc
Binary file not shown.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/whiteCcinn/daemon

go 1.15

require github.com/erikdubbelboer/gspt v0.0.0-20201015204752-6cb2489021da // indirect
require (
github.com/erikdubbelboer/gspt v0.0.0-20201015204752-6cb2489021da
github.com/whiteCcinn/named-pipe-ipc v0.0.2
)
10 changes: 10 additions & 0 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@ type PipeMessage struct {
Type PipeMessageType
Behavior ProcessBehavior
}

type NamedPipeMessageType int

const (
PrintInformation NamedPipeMessageType = iota + 1
)

type NamedPipeMessage struct {
Api NamedPipeMessageType
}

0 comments on commit 315fa6d

Please sign in to comment.