diff --git a/Bristol/mysql/binlog.go b/Bristol/mysql/binlog.go index f0eb0bc3..d95266ff 100755 --- a/Bristol/mysql/binlog.go +++ b/Bristol/mysql/binlog.go @@ -180,6 +180,8 @@ func (This *BinlogDump) BinlogConnCLose(lock bool) { func() { defer func() { if err := recover(); err != nil { + log.Println("This.mysqlConn.Close err: ", err) + log.Println(string(debug.Stack())) return } }() @@ -197,6 +199,8 @@ func (This *BinlogDump) BinlogConnCLose0(lock bool) { } defer func() { if err := recover(); err != nil { + log.Println("This.mysqlConn.Close err: ", err) + log.Println(string(debug.Stack())) return } }() @@ -288,6 +292,7 @@ func (This *BinlogDump) checkDumpConnection(ctx context.Context, cancelFunc cont defer func() { if err := recover(); err != nil { log.Println("binlog.go checkDumpConnection err:", err) + log.Println(string(debug.Stack())) } }() This.Lock() @@ -366,6 +371,8 @@ func (This *BinlogDump) Start() { func (This *BinlogDump) Close() { defer func() { if err := recover(); err != nil { + log.Println(err) + log.Println(string(debug.Stack())) return } }() diff --git a/Bristol/mysql/conn_dump.go b/Bristol/mysql/conn_dump.go index becf74e5..97c1e126 100644 --- a/Bristol/mysql/conn_dump.go +++ b/Bristol/mysql/conn_dump.go @@ -13,6 +13,7 @@ import ( func (mc *mysqlConn) DumpBinlog(parser *eventParser, callbackFun callback) (driver.Rows, error) { defer func() { if err := recover(); err != nil { + log.Println(err) log.Println(string(debug.Stack())) log.Println(parser.dataSource, " binlogFileName:", parser.binlogFileName, " binlogPosition:", parser.binlogPosition) parser.callbackErrChan <- fmt.Errorf(fmt.Sprint(err)) @@ -32,6 +33,7 @@ func (mc *mysqlConn) DumpBinlog(parser *eventParser, callbackFun callback) (driv func (mc *mysqlConn) DumpBinlogGtid(parser *eventParser, callbackFun callback) (driver.Rows, error) { defer func() { if err := recover(); err != nil { + log.Println(err) log.Println(string(debug.Stack())) log.Println(parser.dataSource, " binlogFileName:", parser.binlogFileName, " binlogPosition:", parser.binlogPosition) parser.callbackErrChan <- fmt.Errorf(fmt.Sprint(err)) @@ -129,6 +131,7 @@ func (mc *mysqlConn) DumpBinlog0(parser *eventParser, callbackFun callback) (dri defer func() { if err := recover(); err != nil { e = fmt.Errorf("parseEvent err recover err:%s ;lastMapEvent:%T ;binlogFileName:%s ;binlogPosition:%d", fmt.Sprint(err), parser.lastMapEvent, parser.binlogFileName, parser.binlogPosition) + log.Println(e) log.Println(string(debug.Stack())) } }() diff --git a/Bristol/mysql/connection.go b/Bristol/mysql/connection.go index 7f16b103..9a2bb0b5 100755 --- a/Bristol/mysql/connection.go +++ b/Bristol/mysql/connection.go @@ -139,6 +139,10 @@ func (mc *mysqlConn) Begin() (driver.Tx, error) { } func (mc *mysqlConn) Close() (e error) { + if mc.netConn == nil { + return + } + if mc.server.keepalive > 0 { mc.keepaliveTimer.Stop() } diff --git a/Bristol/mysql/event_row.go b/Bristol/mysql/event_row.go index b6f8970e..80bbbcf0 100755 --- a/Bristol/mysql/event_row.go +++ b/Bristol/mysql/event_row.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "log" + "runtime/debug" "strconv" "strings" "time" @@ -644,6 +645,8 @@ func read_datetime2(buf *bytes.Buffer, fsp uint8) (data string, err error) { defer func() { if errs := recover(); errs != nil { err = fmt.Errorf(fmt.Sprint(errs)) + log.Println(err) + log.Println(string(debug.Stack())) return } }() diff --git a/Bristol/mysql/field_type_json_format.go b/Bristol/mysql/field_type_json_format.go index cf34348d..0713c9f1 100644 --- a/Bristol/mysql/field_type_json_format.go +++ b/Bristol/mysql/field_type_json_format.go @@ -48,7 +48,7 @@ func get_field_json_data0(buf *bytes.Buffer,t uint8,length int64) (interface{},e return read_binary_json_object(buf,length-1,large) case JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY: var large bool - if t == JSONB_TYPE_LARGE_OBJECT{ + if t == JSONB_TYPE_LARGE_ARRAY { large = true } return read_binary_json_array(buf,length - 1, large) @@ -189,6 +189,10 @@ func read_binary_json_object(buf *bytes.Buffer,length int64,large bool) (interfa size = int64(sizeSmall) } + if size == 0 { + return nil, nil + } + if size > length { err := fmt.Errorf("Json length: %d is larger than packet length %d",size,length) return nil,err @@ -258,19 +262,16 @@ func read_offset_or_inline(buf *bytes.Buffer,large bool) (data json_object_inlin if large && (data.x == JSONB_TYPE_INT32 || data.x == JSONB_TYPE_UINT32 ) { data.y = nil - z := read_binary_json_type_inlined(buf,data.x, large) - if z == nil{ - data.z = nil - }else{ - z0 := z.(int64) - data.z = &z0 - } + data.z = read_binary_json_type_inlined(buf,data.x, large) return } data.z = nil if large { - binary.Read(buf, binary.LittleEndian, data.y) - }else{ + var y uint32 + binary.Read(buf, binary.LittleEndian, &y) + y0 := int64(y) + data.y = &y0 + } else { var y uint16 binary.Read(buf, binary.LittleEndian, &y) y0 := int64(y) diff --git a/Bristol/mysql/packets.go b/Bristol/mysql/packets.go index 10bc7d06..fa22988a 100755 --- a/Bristol/mysql/packets.go +++ b/Bristol/mysql/packets.go @@ -97,6 +97,10 @@ func (mc *mysqlConn) readLeftPacket(data []byte, pktLen uint64, haveRead uint64) // Read n bytes long number num func (mc *mysqlConn) readNumber(nr uint8) (uint64, error) { + if mc.bufReader == nil { + return 0, errors.New("Can't read number without reader") + } + // Read bytes into array buf := make([]byte, nr) var n, add int diff --git a/Bristol/mysql/parser.go b/Bristol/mysql/parser.go index a75f42b5..29d12726 100644 --- a/Bristol/mysql/parser.go +++ b/Bristol/mysql/parser.go @@ -330,6 +330,8 @@ func (parser *eventParser) ParserConnClose(lock bool) { func() { func() { if err := recover(); err != nil { + log.Println(err) + log.Println(string(debug.Stack())) return } }() @@ -363,6 +365,8 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string, if err := recover(); err != nil { parser.ParserConnClose(false) errs = fmt.Errorf(string(debug.Stack())) + log.Println(err) + log.Println(string(debug.Stack())) } }() if parser.connStatus == STATUS_CLOSED { @@ -587,6 +591,7 @@ func (parser *eventParser) GetConnectionInfo(connectionId string) (m map[string] if err := recover(); err != nil { parser.ParserConnClose(false) log.Println("binlog.go GetConnectionInfo err:", err) + log.Println(string(debug.Stack())) m = nil } parser.binlogDump.Unlock() @@ -632,6 +637,8 @@ func (parser *eventParser) KillConnect(connectionId string) (b bool) { if err := recover(); err != nil { parser.ParserConnClose(false) b = false + log.Println(err) + log.Println(string(debug.Stack())) } parser.binlogDump.Unlock() }() diff --git a/admin/start.go b/admin/start.go index e66f678b..51ba9b30 100644 --- a/admin/start.go +++ b/admin/start.go @@ -27,6 +27,7 @@ import ( func Start() { defer func() { if err := recover(); err != nil { + log.Println(err) debug.PrintStack() } }() diff --git a/input/mongo/input.go b/input/mongo/input.go index 3d339f1b..73194bfd 100644 --- a/input/mongo/input.go +++ b/input/mongo/input.go @@ -90,6 +90,7 @@ func (c *MongoInput) setStatus(status inputDriver.StatusFlag) { func (c *MongoInput) Start(ch chan *inputDriver.PluginStatus) error { defer func() { if err := recover(); err != nil { + log.Println(err) log.Printf("[ERROR] output[%s] panic err:%+v \n", "mongo", string(debug.Stack())) } c.setStatus(inputDriver.CLOSED) diff --git a/input/mysql/mysql.go b/input/mysql/mysql.go index 6e46e79f..9b9f2432 100644 --- a/input/mysql/mysql.go +++ b/input/mysql/mysql.go @@ -1,10 +1,13 @@ package mysql import ( - mysqlDriver "github.com/brokercap/Bifrost/Bristol/mysql" - inputDriver "github.com/brokercap/Bifrost/input/driver" + "fmt" "log" + "runtime/debug" "sync" + + mysqlDriver "github.com/brokercap/Bifrost/Bristol/mysql" + inputDriver "github.com/brokercap/Bifrost/input/driver" ) var MySQLBinlogDump string @@ -87,6 +90,12 @@ func (c *MysqlInput) monitorDump() (r bool) { defer func() { if err := recover(); err != nil { // 上一层 PluginStatusChan 在进程退出之前会被关闭,这里需要无视异常情况 + if fmt.Sprintf("%s", err) == "send on closed channel"{ + return + } + + log.Println(err) + log.Println(string(debug.Stack())) } }() for { diff --git a/input/mysql/schema.go b/input/mysql/schema.go index f7bbbd50..d54638ea 100644 --- a/input/mysql/schema.go +++ b/input/mysql/schema.go @@ -6,6 +6,7 @@ import ( "github.com/brokercap/Bifrost/Bristol/mysql" inputDriver "github.com/brokercap/Bifrost/input/driver" "log" + "runtime/debug" "strings" "time" ) @@ -18,7 +19,8 @@ func (c *MysqlInput) GetConn() mysql.MysqlConnection { func (c *MysqlInput) GetSchemaList() ([]string, error) { defer func() { if err := recover(); err != nil { - + log.Println(err) + log.Println(string(debug.Stack())) } }() db := c.GetConn() @@ -54,7 +56,8 @@ func (c *MysqlInput) GetSchemaList() ([]string, error) { func (c *MysqlInput) GetSchemaTableList(schema string) (tableList []inputDriver.TableList, err error) { defer func() { if err := recover(); err != nil { - + log.Println(err) + log.Println(string(debug.Stack())) } }() db := c.GetConn() @@ -93,7 +96,8 @@ func (c *MysqlInput) GetSchemaTableList(schema string) (tableList []inputDriver. func (c *MysqlInput) GetSchemaTableFieldList(schema string, table string) (FieldList []inputDriver.TableFieldInfo, err error) { defer func() { if err := recover(); err != nil { - + log.Println(err) + log.Println(string(debug.Stack())) } }() db := c.GetConn() @@ -205,6 +209,8 @@ func (c *MysqlInput) CheckPrivilege() (err error) { defer func() { if err0 := recover(); err0 != nil { err = fmt.Errorf("%s", err0) + log.Println(err) + log.Println(string(debug.Stack())) } }() db := c.GetConn() @@ -216,6 +222,8 @@ func (c *MysqlInput) CheckUri(CheckPrivilege bool) (CheckUriResult inputDriver.C defer func() { if err0 := recover(); err0 != nil { err = fmt.Errorf("%s", err0) + log.Println(err) + log.Println(string(debug.Stack())) } }() dbconn := c.GetConn() @@ -277,6 +285,8 @@ func (c *MysqlInput) GetCurrentPosition() (p *inputDriver.PluginPosition, err er defer func() { if err0 := recover(); err0 != nil { err = fmt.Errorf("%s", err0) + log.Println(err) + log.Println(string(debug.Stack())) } }() dbconn := c.GetConn() @@ -307,6 +317,8 @@ func (c *MysqlInput) GetVersion() (Version string, err error) { defer func() { if err0 := recover(); err0 != nil { err = fmt.Errorf("%s", err0) + log.Println(err) + log.Println(string(debug.Stack())) } }() db := c.GetConn() diff --git a/plugin/ActiveMQ/src/activemq.go b/plugin/ActiveMQ/src/activemq.go index 27ad4ef7..b79c3a7d 100755 --- a/plugin/ActiveMQ/src/activemq.go +++ b/plugin/ActiveMQ/src/activemq.go @@ -3,13 +3,16 @@ package src import ( "encoding/json" "fmt" - pluginDriver "github.com/brokercap/Bifrost/plugin/driver" - "github.com/gmallard/stompngo" "log" "net" + "runtime/debug" "strconv" "strings" "time" + + "github.com/gmallard/stompngo" + + pluginDriver "github.com/brokercap/Bifrost/plugin/driver" ) const VERSION = "v1.6.0" @@ -154,6 +157,8 @@ func (This *Conn) Close() bool { func() { defer func() { if err := recover(); err != nil { + log.Println("This.conn.Disconnect err: ", err) + log.Println(string(debug.Stack())) return } }() diff --git a/plugin/Elasticsearch/src/es.go b/plugin/Elasticsearch/src/es.go index e57957ee..790decb8 100755 --- a/plugin/Elasticsearch/src/es.go +++ b/plugin/Elasticsearch/src/es.go @@ -209,6 +209,8 @@ func (This *Conn) Connect() bool { func (This *Conn) ReConnect() bool { defer func() { if err := recover(); err != nil { + log.Println("This.Close or This.Connect err: ", err) + log.Println(string(debug.Stack())) This.err = fmt.Errorf(fmt.Sprint(err)) } }() @@ -221,6 +223,8 @@ func (This *Conn) Close() bool { func() { defer func() { if err := recover(); err != nil { + log.Println(err) + log.Println(string(debug.Stack())) return } }() @@ -301,6 +305,8 @@ func (This *Conn) AutoCommit() (LastSuccessCommitData *pluginDriver.PluginDataTy e = fmt.Errorf(string(debug.Stack())) This.err = e // log.Println(" This.conn.Err:", This.conn.Err) + log.Println(err) + log.Println(string(debug.Stack())) } }() if This.err != nil { diff --git a/plugin/MongoDB/src/mongodb.go b/plugin/MongoDB/src/mongodb.go index 3ae007b6..2ca32668 100755 --- a/plugin/MongoDB/src/mongodb.go +++ b/plugin/MongoDB/src/mongodb.go @@ -117,6 +117,8 @@ func (This *Conn) ReConnect() bool { defer func() { if err := recover();err !=nil{ This.err = fmt.Errorf(fmt.Sprint(err)) + log.Println(err) + log.Println(string(debug.Stack())) } }() This.Close() @@ -128,6 +130,8 @@ func (This *Conn) Close() bool { func() { defer func() { if err :=recover(); err != nil{ + log.Println(err) + log.Println(string(debug.Stack())) return } }() @@ -192,6 +196,7 @@ func (This *Conn) Insert(data *pluginDriver.PluginDataType,retry bool) (LastSucc LastSuccessCommitData = nil e = fmt.Errorf(string(debug.Stack())) This.err = e + log.Println(err) log.Println(e) return } @@ -233,6 +238,7 @@ func (This *Conn) Del(data *pluginDriver.PluginDataType,retry bool) (LastSuccess LastSuccessCommitData = nil e = fmt.Errorf(string(debug.Stack())) This.err = e + log.Println(err) log.Println(string(debug.Stack())) return } diff --git a/plugin/clickhouse/src/clickhouse.go b/plugin/clickhouse/src/clickhouse.go index 49db276b..3df47ddd 100644 --- a/plugin/clickhouse/src/clickhouse.go +++ b/plugin/clickhouse/src/clickhouse.go @@ -4,6 +4,7 @@ import ( "database/sql/driver" dbDriver "database/sql/driver" "encoding/json" + "errors" "fmt" "log" "runtime/debug" @@ -126,10 +127,12 @@ func (This *Conn) Connect() bool { func (This *Conn) InitVersion() { defer func() { if err := recover(); err != nil { + log.Println(err) + log.Println(string(debug.Stack())) return } }() - if This.conn == nil { + if This.conn.err != nil { return } versionStr := This.conn.GetVersion() @@ -213,6 +216,8 @@ func (This *Conn) Close() bool { func() { defer func() { if err := recover(); err != nil { + log.Println("This.conn.Close err: ", err) + log.Println(string(debug.Stack())) return } }() @@ -499,6 +504,8 @@ func (This *Conn) initAutoCreateCkTableFieldType(data *pluginDriver.PluginDataTy defer func() { if err0 := recover(); err0 != nil { This.conn.err = fmt.Errorf(fmt.Sprint(err0)) + log.Println(err0) + log.Println(string(debug.Stack())) } }() var err error @@ -555,6 +562,8 @@ func (This *Conn) initCkDatabaseMap() { This.p.ckDatabaseMap = make(map[string]bool, 0) defer func() { if err := recover(); err != nil { + log.Println(err) + log.Println(string(debug.Stack())) return } }() @@ -657,6 +666,8 @@ func (This *Conn) Query(data *pluginDriver.PluginDataType, retry bool) (LastSucc defer func() { if err := recover(); err != nil { This.conn.err = fmt.Errorf("ddl exec err:%s", fmt.Sprint(err)) + log.Println(err) + log.Println(string(debug.Stack())) } }() }() @@ -823,6 +834,8 @@ func (This *Conn) AutoCommit() (LastSuccessCommitData *pluginDriver.PluginDataTy if err := recover(); err != nil { e = fmt.Errorf(string(debug.Stack())) This.conn.err = e + log.Println(err) + log.Println(string(debug.Stack())) } }() if This.conn == nil || This.conn.err != nil { @@ -833,7 +846,12 @@ func (This *Conn) AutoCommit() (LastSuccessCommitData *pluginDriver.PluginDataTy return nil, nil, This.conn.err } if This.err != nil { - log.Println("This.err:", This.err) + if errors.Is(This.err, driver.ErrBadConn) { + log.Println(This.err) + } else { + log.Println("This.err:", This.err) + log.Println(string(debug.Stack())) + } } n := len(This.p.Data.Data) if n == 0 { diff --git a/plugin/clickhouse/src/schema.go b/plugin/clickhouse/src/schema.go index a5e00275..c53d6ef6 100644 --- a/plugin/clickhouse/src/schema.go +++ b/plugin/clickhouse/src/schema.go @@ -4,6 +4,7 @@ import ( "database/sql/driver" clickhouse "github.com/ClickHouse/clickhouse-go" "log" + "runtime/debug" ) type ckFieldStruct struct { @@ -40,6 +41,9 @@ func(This *ClickhouseDB) Close() bool{ defer func() { if err := recover();err != nil{ log.Println("clickhouseDB close err:",err) + log.Printf("This: %+v\n", This) + log.Printf("This.conn: %+v\n", This.conn) + log.Println(string(debug.Stack())) } }() if This.conn != nil{ @@ -51,9 +55,12 @@ func(This *ClickhouseDB) Close() bool{ func (This *ClickhouseDB) GetSchemaList() (data []string) { This.conn.Begin() stmt, err := This.conn.Prepare("SHOW DATABASES") - if err == nil{ - defer stmt.Close() + if err != nil { + This.err = err + return } + + defer stmt.Close() rows, err := stmt.Query([]driver.Value{}) if err != nil { This.err = err diff --git a/plugin/clickhouse/src/transfer.go b/plugin/clickhouse/src/transfer.go index 4f0ada97..4133f8d2 100644 --- a/plugin/clickhouse/src/transfer.go +++ b/plugin/clickhouse/src/transfer.go @@ -4,8 +4,10 @@ import ( "encoding/json" "fmt" pluginDriver "github.com/brokercap/Bifrost/plugin/driver" + "log" "reflect" "regexp" + "runtime/debug" "strconv" "strings" "time" @@ -39,6 +41,8 @@ func CkDataTypeTransfer(data interface{}, fieldName string, toDataType string, N defer func() { if err := recover(); err != nil { e = fmt.Errorf(fieldName + " " + fmt.Sprint(err)) + log.Println(err) + log.Println(string(debug.Stack())) } }() switch toDataType { diff --git a/plugin/driver/customer_json_test.go b/plugin/driver/customer_json_test.go index 1f21cba3..11cd6e18 100644 --- a/plugin/driver/customer_json_test.go +++ b/plugin/driver/customer_json_test.go @@ -2,7 +2,9 @@ package driver import ( "encoding/json" + "log" "reflect" + "runtime/debug" "testing" . "github.com/smartystreets/goconvey/convey" @@ -90,6 +92,8 @@ func TestPluginDataCustomerJson_GetMapData(t *testing.T) { defer func() { var result bool = false if err := recover(); err != nil { + log.Println(err) + log.Println(string(debug.Stack())) result = true } So(result, ShouldEqual, true) diff --git a/plugin/kafka/src/kafka.go b/plugin/kafka/src/kafka.go index eb9c8362..7035c872 100644 --- a/plugin/kafka/src/kafka.go +++ b/plugin/kafka/src/kafka.go @@ -19,9 +19,13 @@ package src import ( "encoding/json" "fmt" + "log" + "runtime/debug" + "time" + "github.com/Shopify/sarama" + pluginDriver "github.com/brokercap/Bifrost/plugin/driver" - "time" ) const VERSION = "v2.0.5" @@ -174,6 +178,8 @@ func (This *Conn) ReConnect() bool { func() { defer func() { if err := recover(); err != nil { + log.Println(err) + log.Println(string(debug.Stack())) return } }() @@ -194,6 +200,8 @@ func (This *Conn) Close() bool { func() { defer func() { if err := recover(); err != nil { + log.Println(err) + log.Println(string(debug.Stack())) return } }() diff --git a/plugin/memcache/src/memcache.go b/plugin/memcache/src/memcache.go index 93ecc310..051716e6 100644 --- a/plugin/memcache/src/memcache.go +++ b/plugin/memcache/src/memcache.go @@ -1,10 +1,14 @@ package src import ( - "github.com/brokercap/Bifrost/plugin/driver" - "github.com/bradfitz/gomemcache/memcache" - "fmt" "encoding/json" + "fmt" + "log" + "runtime/debug" + + "github.com/bradfitz/gomemcache/memcache" + + "github.com/brokercap/Bifrost/plugin/driver" ) const VERSION = "v1.6.0" @@ -105,6 +109,8 @@ func (This *Conn) ReConnect() bool { defer func() { if err := recover();err !=nil{ This.err = fmt.Errorf(fmt.Sprint(err)) + log.Println(err) + log.Println(string(debug.Stack())) } }() This.Connect() @@ -170,4 +176,4 @@ func (This *Conn) Query(data *driver.PluginDataType,retry bool) (*driver.PluginD func (This *Conn) Commit(data *driver.PluginDataType,retry bool) (*driver.PluginDataType, *driver.PluginDataType,error){ return data,nil,nil -} \ No newline at end of file +} diff --git a/plugin/mysql/src/mysql.go b/plugin/mysql/src/mysql.go index 555dc0a9..2614683e 100644 --- a/plugin/mysql/src/mysql.go +++ b/plugin/mysql/src/mysql.go @@ -188,6 +188,7 @@ func (This *Conn) SetParam(p interface{}) (interface{}, error) { func (This *Conn) initToMysqlTableFieldType() { defer func() { if err := recover(); err != nil { + log.Println(err) log.Println(string(debug.Stack())) This.conn.err = fmt.Errorf(string(debug.Stack())) } @@ -293,6 +294,7 @@ func (This *Conn) CreateTableAndGetTableFieldsType(data *pluginDriver.PluginData func (This *Conn) getAutoTableFieldType(data *pluginDriver.PluginDataType) (*PluginParam0, error) { defer func() { if err := recover(); err != nil { + log.Println(err) log.Println(string(debug.Stack())) This.conn.err = fmt.Errorf(string(debug.Stack())) } @@ -396,6 +398,8 @@ func (This *Conn) initToDatabaseMap() { This.p.toDatabaseMap = make(map[string]bool, 0) defer func() { if err := recover(); err != nil { + log.Println(err) + log.Println(string(debug.Stack())) return } }() @@ -435,6 +439,8 @@ func (This *Conn) ReConnect() bool { if err := recover(); err != nil { This.conn.err = fmt.Errorf(fmt.Sprint(err) + " debug:" + string(debug.Stack())) This.err = This.conn.err + log.Println(err) + log.Println(string(debug.Stack())) } }() if This.conn != nil { @@ -455,6 +461,8 @@ func (This *Conn) StmtClose() { defer func() { if err := recover(); err != nil { This.conn.err = fmt.Errorf("StmtClose err:%s", fmt.Sprint(err)) + log.Println(err) + log.Println(string(debug.Stack())) return } }() @@ -470,6 +478,8 @@ func (This *Conn) Close() bool { func() { defer func() { if err := recover(); err != nil { + log.Println(err) + log.Println(string(debug.Stack())) return } }() @@ -616,6 +626,7 @@ func (This *Conn) AutoCommit() (LastSuccessCommitData *pluginDriver.PluginDataTy defer func() { if err := recover(); err != nil { e = fmt.Errorf(string(debug.Stack())) + log.Println(err) log.Println(string(debug.Stack())) This.conn.err = e This.err = e diff --git a/plugin/mysql/src/schema.go b/plugin/mysql/src/schema.go index a5be320e..3da48d16 100644 --- a/plugin/mysql/src/schema.go +++ b/plugin/mysql/src/schema.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/brokercap/Bifrost/Bristol/mysql" "log" + "runtime/debug" "strconv" ) @@ -26,6 +27,7 @@ func (This *mysqlDB) Open() (b bool) { defer func() { if err := recover(); err != nil { log.Printf("[ERROR] output[%s] mysqlDB Open err:%+v \n", OutputName, err) + log.Println(string(debug.Stack())) This.err = fmt.Errorf(fmt.Sprint(err)) b = false } @@ -38,6 +40,7 @@ func (This *mysqlDB) Close() bool { defer func() { if err := recover(); err != nil { log.Printf("[ERROR] output[%s] mysqlDB Close err:%+v \n", OutputName, err) + log.Println(string(debug.Stack())) } }() if This.conn != nil { diff --git a/plugin/mysql/src/starrocks.go b/plugin/mysql/src/starrocks.go index 2b3d49f1..a0f25f4e 100644 --- a/plugin/mysql/src/starrocks.go +++ b/plugin/mysql/src/starrocks.go @@ -19,6 +19,7 @@ func (This *Conn) GetStarRocksBeCount() int { func (This *Conn) initIsStarrock() { defer func() { if err := recover(); err != nil { + log.Println(err) log.Printf("[ERROR] output[%s] initIsStarrock recover:%+v \n", OutputName, string(debug.Stack())) return } diff --git a/plugin/pool.go b/plugin/pool.go index b2461375..8ed8dce2 100644 --- a/plugin/pool.go +++ b/plugin/pool.go @@ -146,6 +146,7 @@ func BackPlugin(ToServerConn *ToServerConn) bool { func(){ defer func() { if err := recover();err != nil{ + log.Println(err) log.Println(string(debug.Stack())) return } @@ -166,4 +167,4 @@ func BackPlugin(ToServerConn *ToServerConn) bool { } t.Unlock() return true -} \ No newline at end of file +} diff --git a/plugin/rabbitmq/src/rabbitmq.go b/plugin/rabbitmq/src/rabbitmq.go index 7064d9db..74504fe5 100644 --- a/plugin/rabbitmq/src/rabbitmq.go +++ b/plugin/rabbitmq/src/rabbitmq.go @@ -1,12 +1,15 @@ package src import ( - pluginDriver "github.com/brokercap/Bifrost/plugin/driver" - "github.com/streadway/amqp" - "strconv" "encoding/json" "fmt" "log" + "runtime/debug" + "strconv" + + "github.com/streadway/amqp" + + pluginDriver "github.com/brokercap/Bifrost/plugin/driver" ) const VERSION = "v1.6.0" @@ -143,6 +146,7 @@ func (This *Conn) Close() bool { defer func(){ if err := recover();err != nil{ log.Println("ReConnect recory:",err) + log.Println(string(debug.Stack())) return } }() @@ -299,4 +303,4 @@ func (This *Conn) sendToList(data *pluginDriver.PluginDataType) (*pluginDriver.P func (This *Conn) TimeOutCommit() (*pluginDriver.PluginDataType,*pluginDriver.PluginDataType,error) { return nil,nil,nil -} \ No newline at end of file +} diff --git a/plugin/redis/src/redis.go b/plugin/redis/src/redis.go index e0eba78f..66fa57ed 100644 --- a/plugin/redis/src/redis.go +++ b/plugin/redis/src/redis.go @@ -1,16 +1,20 @@ package src import ( + "context" "encoding/json" "errors" "fmt" - "github.com/brokercap/Bifrost/plugin/driver" - "context" - //"github.com/go-redis/redis" - "github.com/go-redis/redis/v8" + "log" + "runtime/debug" "strconv" "strings" "time" + + "github.com/brokercap/Bifrost/plugin/driver" + + //"github.com/go-redis/redis" + "github.com/go-redis/redis/v8" ) const VERSION = "v1.7.4" @@ -167,6 +171,8 @@ func (This *Conn) ReConnect() bool { defer func() { if err := recover();err !=nil{ This.err = fmt.Errorf(fmt.Sprint(err)) + log.Println(err) + log.Println(string(debug.Stack())) } }() if This.conn != nil { @@ -292,4 +298,4 @@ func (This *Conn) Commit(data *driver.PluginDataType,retry bool) (LastSuccessCom } } return data,nil,nil -} \ No newline at end of file +} diff --git a/server/db.go b/server/db.go index ed5585b3..a9de60ab 100755 --- a/server/db.go +++ b/server/db.go @@ -572,7 +572,7 @@ func (db *db) monitorDump() (r bool) { warning.AppendWarning(warning.WarningContent{ Type: warning.WARNINGERROR, DbName: db.Name, - Body: fmt.Sprintf("err:%s; last status:%s", inputStatusInfo.Error, lastStatus), + Body: fmt.Sprintf("err:%v; last status:%s", inputStatusInfo.Error, lastStatus), }) } diff --git a/server/history/select.go b/server/history/select.go index 4ec93463..11016aca 100644 --- a/server/history/select.go +++ b/server/history/select.go @@ -39,6 +39,8 @@ func (This *History) threadStart(i int,wg *sync.WaitGroup) { defer func() { defer func() { if err:=recover();err!=nil{ + log.Println(err) + log.Println(string(debug.Stack())) return } }() @@ -308,4 +310,4 @@ func (This *History) GetNextSql() (sql string,start uint64){ sql = "SELECT * FROM `" + This.SchemaName + "`.`" + This.CurrentTableName + "` " + where } return -} \ No newline at end of file +} diff --git a/server/recovery.go b/server/recovery.go index 76896e3f..90fa190f 100755 --- a/server/recovery.go +++ b/server/recovery.go @@ -526,6 +526,8 @@ func StopAllChannel() { func() { defer func() { if err := recover(); err != nil { + log.Println(err) + log.Println(string(debug.Stack())) return } }() diff --git a/server/server_save_recovery.go b/server/server_save_recovery.go index 319a3804..1b7d2192 100644 --- a/server/server_save_recovery.go +++ b/server/server_save_recovery.go @@ -2,14 +2,16 @@ package server import ( "encoding/json" + "log" + "runtime/debug" + "sync" + "time" + "github.com/brokercap/Bifrost/config" "github.com/brokercap/Bifrost/plugin" "github.com/brokercap/Bifrost/server/storage" "github.com/brokercap/Bifrost/server/user" "github.com/brokercap/Bifrost/server/warning" - "log" - "sync" - "time" ) var l sync.RWMutex @@ -73,6 +75,7 @@ func GetSnapshotData() ([]byte, error) { l.Unlock() if err := recover(); err != nil { log.Println(err) + log.Println(string(debug.Stack())) } }() data := recoveryDataSturct{ @@ -93,6 +96,7 @@ func GetSnapshotData2() ([]byte, error) { l.Unlock() if err := recover(); err != nil { log.Println(err) + log.Println(string(debug.Stack())) } }() data := recoveryDataSturct{ diff --git a/server/to_server_consume.go b/server/to_server_consume.go index f190e854..1a402172 100644 --- a/server/to_server_consume.go +++ b/server/to_server_consume.go @@ -590,6 +590,7 @@ func (This *ToServer) getPluginAndSetParam(MyConsumerId int) (PluginConn *plugin func (This *ToServer) timeOutCommit(MyConsumerId int) ( LastSuccessCommitData *pluginDriver.PluginDataType,ErrData *pluginDriver.PluginDataType, err error) { defer func() { if err2 := recover();err2 != nil { + log.Println(err2) err = fmt.Errorf("ToServer:%s Commit Debug Err:%s",This.ToServerKey,string(debug.Stack())) log.Println(This.ToServerKey,"sendToServer err:",err) } @@ -610,6 +611,7 @@ func (This *ToServer) timeOutCommit(MyConsumerId int) ( LastSuccessCommitData *p func (This *ToServer) SkipBinlog(MyConsumerId int,SkipErrData *pluginDriver.PluginDataType) (err error){ defer func() { if err2 := recover();err2 != nil { + log.Println(err2) err = fmt.Errorf("ToServer:%s Commit Debug Err:%s",This.ToServerKey,string(debug.Stack())) log.Println(This.ToServerKey,"sendToServer err:",err) } @@ -668,4 +670,4 @@ func (This *ToServer) sendToServer(paramData *pluginDriver.PluginDataType,MyCons break } return -} \ No newline at end of file +} diff --git a/server/warning/init.go b/server/warning/init.go index b89dfbf7..bf250180 100644 --- a/server/warning/init.go +++ b/server/warning/init.go @@ -118,6 +118,7 @@ func consumeWarning(){ func sendToWaring(config WaringConfig,title,c string,n int){ defer func() { if err:=recover();err!=nil{ + log.Println(err) log.Println(string(debug.Stack())) } }() @@ -141,6 +142,7 @@ func sendToWaring(config WaringConfig,title,c string,n int){ func CheckWarngConfigBySendTest(config WaringConfig,c string) error{ defer func() { if err:=recover();err!=nil{ + log.Println(err) log.Println(string(debug.Stack())) } }() @@ -150,4 +152,4 @@ func CheckWarngConfigBySendTest(config WaringConfig,c string) error{ title := "Bifrost warning test" err := dirverMap[config.Type].SendWarning(config.Param,title,c) return err -} \ No newline at end of file +} diff --git a/xdb/driver/driver.go b/xdb/driver/driver.go index 26a31dfe..7e5338ce 100644 --- a/xdb/driver/driver.go +++ b/xdb/driver/driver.go @@ -1,10 +1,11 @@ package driver import ( - "log" - "sync" "encoding/json" "fmt" + "log" + "runtime/debug" + "sync" ) var ( @@ -39,6 +40,7 @@ func Register(name string, driver Driver,version string) { defer func() { if err := recover();err!=nil{ log.Println(err) + log.Println(string(debug.Stack())) } }() driversMu.Lock() @@ -77,4 +79,4 @@ func Open(name string,uri string) (XdbDriver,error){ return nil,fmt.Errorf(name+" not exsit") } return drivers[name].driver.Open(uri) -} \ No newline at end of file +} diff --git a/xdb/pool.go b/xdb/pool.go index 431104ce..e571a39d 100644 --- a/xdb/pool.go +++ b/xdb/pool.go @@ -81,6 +81,7 @@ func GetClient(name string) (c *Client,err error) { func BackCient(name string,c *Client) bool { defer func() { if err := recover();err !=nil{ + log.Println(err) log.Println(string(debug.Stack())) return } @@ -95,6 +96,7 @@ func BackCient(name string,c *Client) bool { func(){ defer func() { if err := recover();err != nil{ + log.Println(err) log.Println(string(debug.Stack())) return } @@ -108,4 +110,4 @@ func BackCient(name string,c *Client) bool { } t.Unlock() return true -} \ No newline at end of file +}