diff --git a/core/common.go b/core/common.go index 8353ab3..c3ce21c 100644 --- a/core/common.go +++ b/core/common.go @@ -137,8 +137,7 @@ func netInfo() *NetInfo { continue } rsp := NetInfo{} - err = json.Unmarshal(buf[:n], &rsp) - if err != nil { + if err = json.Unmarshal(buf[:n], &rsp); err != nil { gLog.Printf(LvERROR, "wrong NetInfo:%s", err) continue } diff --git a/core/config.go b/core/config.go index 24f24c3..0ade7ea 100644 --- a/core/config.go +++ b/core/config.go @@ -3,6 +3,7 @@ package openp2p import ( "encoding/json" "flag" + "fmt" "io/ioutil" "os" "sync" @@ -41,6 +42,10 @@ type AppConfig struct { isUnderlayServer int // TODO: bool? } +func (c *AppConfig) ID() string { + return fmt.Sprintf("%s%d", c.Protocol, c.SrcPort) +} + // TODO: add loglevel, maxlogfilesize type Config struct { Network NetworkConfig `json:"network"` diff --git a/core/handlepush.go b/core/handlepush.go index 9b55a9f..d005500 100644 --- a/core/handlepush.go +++ b/core/handlepush.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "path/filepath" + "reflect" "time" "github.com/openp2p-cn/totp" @@ -22,62 +23,10 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead) switch subType { case MsgPushConnectReq: // TODO: handle a msg move to a new function - req := PushConnectReq{} - err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req) - if err != nil { - gLog.Printf(LvERROR, "wrong MsgPushConnectReq:%s", err) - return err - } - gLog.Printf(LvDEBUG, "%s is connecting...", req.From) - gLog.Println(LvDEBUG, "push connect response to ", req.From) - if compareVersion(req.Version, LeastSupportVersion) == LESS { - gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From) - rsp := PushConnectRsp{ - Error: 10, - Detail: ErrVersionNotCompatible.Error(), - To: req.From, - From: pn.config.Node, - } - pn.push(req.From, MsgPushConnectRsp, rsp) - return ErrVersionNotCompatible - } - // verify totp token or token - t := totp.TOTP{Step: totp.RelayTOTPStep} - if t.Verify(req.Token, pn.config.Token, time.Now().Unix()-pn.dt) { // localTs may behind, auto adjust ts - gLog.Printf(LvINFO, "Access Granted\n") - config := AppConfig{} - config.peerNatType = req.NatType - config.peerConeNatPort = req.ConeNatPort - config.peerIP = req.FromIP - config.PeerNode = req.From - config.peerVersion = req.Version - config.fromToken = req.Token - config.peerIPv6 = req.IPv6 - config.hasIPv4 = req.HasIPv4 - config.hasUPNPorNATPMP = req.HasUPNPorNATPMP - config.linkMode = req.LinkMode - config.isUnderlayServer = req.IsUnderlayServer - // share relay node will limit bandwidth - if req.Token != pn.config.Token { - gLog.Printf(LvINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth) - config.shareBandwidth = pn.config.ShareBandwidth - } - // go pn.AddTunnel(config, req.ID) - go pn.addDirectTunnel(config, req.ID) - break - } - gLog.Println(LvERROR, "Access Denied:", req.From) - rsp := PushConnectRsp{ - Error: 1, - Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node), - To: req.From, - From: pn.config.Node, - } - pn.push(req.From, MsgPushConnectRsp, rsp) + err = handleConnectReq(pn, subType, msg) case MsgPushRsp: rsp := PushRsp{} - err := json.Unmarshal(msg[openP2PHeaderSize:], &rsp) - if err != nil { + if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil { gLog.Printf(LvERROR, "wrong pushRsp:%s", err) return err } @@ -88,9 +37,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { } case MsgPushAddRelayTunnelReq: req := AddRelayTunnelReq{} - err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req) - if err != nil { - gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err) + if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err) return err } config := AppConfig{} @@ -106,9 +54,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { }(req) case MsgPushAPPKey: req := APPKeySync{} - err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req) - if err != nil { - gLog.Printf(LvERROR, "wrong APPKeySync:%s", err) + if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err) return err } SaveKey(req.AppID, req.AppKey) @@ -133,123 +80,16 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { os.Exit(0) return err case MsgPushReportApps: - gLog.Println(LvINFO, "MsgPushReportApps") - req := ReportApps{} - gConf.mtx.Lock() - defer gConf.mtx.Unlock() - for _, config := range gConf.Apps { - appActive := 0 - relayNode := "" - relayMode := "" - linkMode := LinkModeUDPPunch - i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) - if ok { - app := i.(*p2pApp) - if app.isActive() { - appActive = 1 - } - relayNode = app.relayNode - relayMode = app.relayMode - linkMode = app.tunnel.linkModeWeb - } - appInfo := AppInfo{ - AppName: config.AppName, - Error: config.errMsg, - Protocol: config.Protocol, - SrcPort: config.SrcPort, - RelayNode: relayNode, - RelayMode: relayMode, - LinkMode: linkMode, - PeerNode: config.PeerNode, - DstHost: config.DstHost, - DstPort: config.DstPort, - PeerUser: config.PeerUser, - PeerIP: config.peerIP, - PeerNatType: config.peerNatType, - RetryTime: config.retryTime.Local().Format("2006-01-02T15:04:05-0700"), - ConnectTime: config.connectTime.Local().Format("2006-01-02T15:04:05-0700"), - IsActive: appActive, - Enabled: config.Enabled, - } - req.Apps = append(req.Apps, appInfo) - } - pn.write(MsgReport, MsgReportApps, &req) + err = handleReportApps(pn, subType, msg) case MsgPushReportLog: - gLog.Println(LvDEBUG, "MsgPushReportLog") - req := ReportLogReq{} - err := json.Unmarshal(msg[openP2PHeaderSize:], &req) - if err != nil { - gLog.Printf(LvERROR, "wrong MsgPushReportLog:%s %s", err, string(msg[openP2PHeaderSize:])) - return err - } - if req.FileName == "" { - req.FileName = "openp2p.log" - } - f, err := os.Open(filepath.Join("log", req.FileName)) - if err != nil { - gLog.Println(LvERROR, "read log file error:", err) - break - } - fi, err := f.Stat() - if err != nil { - break - } - if req.Offset == 0 && fi.Size() > 4096 { - req.Offset = fi.Size() - 4096 - } - if req.Len <= 0 { - req.Len = 4096 - } - f.Seek(req.Offset, 0) - if req.Len > 1024*1024 { // too large - break - } - buff := make([]byte, req.Len) - readLength, err := f.Read(buff) - f.Close() - if err != nil { - gLog.Println(LvERROR, "read log content error:", err) - break - } - rsp := ReportLogRsp{} - rsp.Content = string(buff[:readLength]) - rsp.FileName = req.FileName - rsp.Total = fi.Size() - rsp.Len = req.Len - pn.write(MsgReport, MsgPushReportLog, &rsp) + err = handleLog(pn, subType, msg) case MsgPushEditApp: - gLog.Println(LvINFO, "MsgPushEditApp") - newApp := AppInfo{} - err := json.Unmarshal(msg[openP2PHeaderSize:], &newApp) - if err != nil { - gLog.Printf(LvERROR, "wrong MsgPushEditApp:%s %s", err, string(msg[openP2PHeaderSize:])) - return err - } - oldConf := AppConfig{Enabled: 1} - // protocol0+srcPort0 exist, delApp - oldConf.AppName = newApp.AppName - oldConf.Protocol = newApp.Protocol0 - oldConf.SrcPort = newApp.SrcPort0 - oldConf.PeerNode = newApp.PeerNode - oldConf.DstHost = newApp.DstHost - oldConf.DstPort = newApp.DstPort - - gConf.delete(oldConf) - // AddApp - newConf := oldConf - newConf.Protocol = newApp.Protocol - newConf.SrcPort = newApp.SrcPort - gConf.add(newConf, false) - pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end - // autoReconnect will auto AddApp - // pn.AddApp(config) - // TODO: report result + err = handleEditApp(pn, subType, msg) case MsgPushEditNode: gLog.Println(LvINFO, "MsgPushEditNode") req := EditNode{} - err := json.Unmarshal(msg[openP2PHeaderSize:], &req) - if err != nil { - gLog.Printf(LvERROR, "wrong MsgPushEditNode:%s %s", err, string(msg[openP2PHeaderSize:])) + if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:])) return err } gConf.setNode(req.NewName) @@ -259,9 +99,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { case MsgPushSwitchApp: gLog.Println(LvINFO, "MsgPushSwitchApp") app := AppInfo{} - err := json.Unmarshal(msg[openP2PHeaderSize:], &app) - if err != nil { - gLog.Printf(LvERROR, "wrong MsgPushSwitchApp:%s %s", err, string(msg[openP2PHeaderSize:])) + if err = json.Unmarshal(msg[openP2PHeaderSize:], &app); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(app), err, string(msg[openP2PHeaderSize:])) return err } config := AppConfig{Enabled: app.Enabled, SrcPort: app.SrcPort, Protocol: app.Protocol} @@ -273,19 +112,193 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { } case MsgPushDstNodeOnline: gLog.Println(LvINFO, "MsgPushDstNodeOnline") - app := PushDstNodeOnline{} - err := json.Unmarshal(msg[openP2PHeaderSize:], &app) - if err != nil { - gLog.Printf(LvERROR, "wrong MsgPushDstNodeOnline:%s %s", err, string(msg[openP2PHeaderSize:])) + req := PushDstNodeOnline{} + if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:])) return err } - gLog.Println(LvINFO, "retry peerNode ", app.Node) - gConf.retryApp(app.Node) + gLog.Println(LvINFO, "retry peerNode ", req.Node) + gConf.retryApp(req.Node) default: pn.msgMapMtx.Lock() ch := pn.msgMap[pushHead.From] pn.msgMapMtx.Unlock() ch <- msg } - return nil + return err +} + +func handleEditApp(pn *P2PNetwork, subType uint16, msg []byte) (err error) { + gLog.Println(LvINFO, "MsgPushEditApp") + newApp := AppInfo{} + if err = json.Unmarshal(msg[openP2PHeaderSize:], &newApp); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(newApp), err, string(msg[openP2PHeaderSize:])) + return err + } + oldConf := AppConfig{Enabled: 1} + // protocol0+srcPort0 exist, delApp + oldConf.AppName = newApp.AppName + oldConf.Protocol = newApp.Protocol0 + oldConf.SrcPort = newApp.SrcPort0 + oldConf.PeerNode = newApp.PeerNode + oldConf.DstHost = newApp.DstHost + oldConf.DstPort = newApp.DstPort + + gConf.delete(oldConf) + // AddApp + newConf := oldConf + newConf.Protocol = newApp.Protocol + newConf.SrcPort = newApp.SrcPort + gConf.add(newConf, false) + pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end + return nil + // autoReconnect will auto AddApp + // pn.AddApp(config) + // TODO: report result +} + +func handleConnectReq(pn *P2PNetwork, subType uint16, msg []byte) (err error) { + req := PushConnectReq{} + if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err) + return err + } + gLog.Printf(LvDEBUG, "%s is connecting...", req.From) + gLog.Println(LvDEBUG, "push connect response to ", req.From) + if compareVersion(req.Version, LeastSupportVersion) == LESS { + gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From) + rsp := PushConnectRsp{ + Error: 10, + Detail: ErrVersionNotCompatible.Error(), + To: req.From, + From: pn.config.Node, + } + pn.push(req.From, MsgPushConnectRsp, rsp) + return ErrVersionNotCompatible + } + // verify totp token or token + t := totp.TOTP{Step: totp.RelayTOTPStep} + if t.Verify(req.Token, pn.config.Token, time.Now().Unix()-pn.dt/int64(time.Second)) { // localTs may behind, auto adjust ts + gLog.Printf(LvINFO, "Access Granted\n") + config := AppConfig{} + config.peerNatType = req.NatType + config.peerConeNatPort = req.ConeNatPort + config.peerIP = req.FromIP + config.PeerNode = req.From + config.peerVersion = req.Version + config.fromToken = req.Token + config.peerIPv6 = req.IPv6 + config.hasIPv4 = req.HasIPv4 + config.hasUPNPorNATPMP = req.HasUPNPorNATPMP + config.linkMode = req.LinkMode + config.isUnderlayServer = req.IsUnderlayServer + // share relay node will limit bandwidth + if req.Token != pn.config.Token { + gLog.Printf(LvINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth) + config.shareBandwidth = pn.config.ShareBandwidth + } + // go pn.AddTunnel(config, req.ID) + go pn.addDirectTunnel(config, req.ID) + return nil + } + gLog.Println(LvERROR, "Access Denied:", req.From) + rsp := PushConnectRsp{ + Error: 1, + Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node), + To: req.From, + From: pn.config.Node, + } + return pn.push(req.From, MsgPushConnectRsp, rsp) +} + +func handleReportApps(pn *P2PNetwork, subType uint16, msg []byte) (err error) { + gLog.Println(LvINFO, "MsgPushReportApps") + req := ReportApps{} + gConf.mtx.Lock() + defer gConf.mtx.Unlock() + for _, config := range gConf.Apps { + appActive := 0 + relayNode := "" + relayMode := "" + linkMode := LinkModeUDPPunch + i, ok := pn.apps.Load(config.ID()) + if ok { + app := i.(*p2pApp) + if app.isActive() { + appActive = 1 + } + relayNode = app.relayNode + relayMode = app.relayMode + linkMode = app.tunnel.linkModeWeb + } + appInfo := AppInfo{ + AppName: config.AppName, + Error: config.errMsg, + Protocol: config.Protocol, + SrcPort: config.SrcPort, + RelayNode: relayNode, + RelayMode: relayMode, + LinkMode: linkMode, + PeerNode: config.PeerNode, + DstHost: config.DstHost, + DstPort: config.DstPort, + PeerUser: config.PeerUser, + PeerIP: config.peerIP, + PeerNatType: config.peerNatType, + RetryTime: config.retryTime.Local().Format("2006-01-02T15:04:05-0700"), + ConnectTime: config.connectTime.Local().Format("2006-01-02T15:04:05-0700"), + IsActive: appActive, + Enabled: config.Enabled, + } + req.Apps = append(req.Apps, appInfo) + } + return pn.write(MsgReport, MsgReportApps, &req) +} + +func handleLog(pn *P2PNetwork, subType uint16, msg []byte) (err error) { + gLog.Println(LvDEBUG, "MsgPushReportLog") + const defaultLen = 1024 * 128 + const maxLen = 1024 * 1024 + req := ReportLogReq{} + if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:])) + return err + } + if req.FileName == "" { + req.FileName = "openp2p.log" + } + f, err := os.Open(filepath.Join("log", req.FileName)) + if err != nil { + gLog.Println(LvERROR, "read log file error:", err) + return err + } + fi, err := f.Stat() + if err != nil { + return err + } + if req.Offset > fi.Size() { + req.Offset = fi.Size() - defaultLen + } + // verify input parameters + if req.Offset < 0 { + req.Offset = 0 + } + if req.Len <= 0 || req.Len > maxLen { + req.Len = defaultLen + } + + f.Seek(req.Offset, 0) + buff := make([]byte, req.Len) + readLength, err := f.Read(buff) + f.Close() + if err != nil { + gLog.Println(LvERROR, "read log content error:", err) + return err + } + rsp := ReportLogRsp{} + rsp.Content = string(buff[:readLength]) + rsp.FileName = req.FileName + rsp.Total = fi.Size() + rsp.Len = req.Len + return pn.write(MsgReport, MsgPushReportLog, &rsp) } diff --git a/core/nat.go b/core/nat.go index 6912e0c..eab5574 100644 --- a/core/nat.go +++ b/core/nat.go @@ -84,7 +84,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string, return "", 0, err } natRsp := NatDetectRsp{} - err = json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp) + json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp) return natRsp.IP, natRsp.Port, nil } @@ -121,6 +121,7 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP echoConn, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: echoPort}) if err != nil { gLog.Println(LvERROR, "echo server listen error:", err) + wg.Done() return } buf := make([]byte, 1600) @@ -135,7 +136,10 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP echoConn.WriteToUDP(buf[0:n], addr) gLog.Println(LvDEBUG, "echo server end") }() - wg.Wait() // wait echo udp + wg.Wait() // wait echo udp + if echoConn == nil { // listen error + return + } defer echoConn.Close() // testing for public ip for i := 0; i < 2; i++ { diff --git a/core/p2pnetwork.go b/core/p2pnetwork.go index 7945a9f..67b5210 100644 --- a/core/p2pnetwork.go +++ b/core/p2pnetwork.go @@ -10,6 +10,7 @@ import ( "math/rand" "net/http" "net/url" + "reflect" "strings" "sync" "time" @@ -112,30 +113,18 @@ func (pn *P2PNetwork) runAll() { allApps := gConf.Apps // read a copy, other thread will modify the gConf.Apps for _, config := range allApps { - if config.nextRetryTime.After(time.Now()) { - continue - } - if config.Enabled == 0 { + if config.nextRetryTime.After(time.Now()) || config.Enabled == 0 || config.retryNum >= retryLimit { continue } if config.AppName == "" { - config.AppName = fmt.Sprintf("%s%d", config.Protocol, config.SrcPort) + config.AppName = config.ID() } - appExist := false - i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) - if ok { - app := i.(*p2pApp) - appExist = true - if app.isActive() { + if i, ok := pn.apps.Load(config.ID()); ok { + if app := i.(*p2pApp); app.isActive() { continue } - } - if appExist { pn.DeleteApp(*config) } - if config.retryNum >= retryLimit { - continue - } if config.retryNum > 0 { // first time not show reconnect log gLog.Printf(LvINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum) @@ -160,6 +149,7 @@ func (pn *P2PNetwork) runAll() { } } } + func (pn *P2PNetwork) autorunApp() { gLog.Println(LvINFO, "autorunApp start") pn.wgReconnect.Add(1) @@ -181,9 +171,7 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri return nil, 0, "", errors.New("read MsgRelayNodeRsp error") } rsp := RelayNodeRsp{} - err := json.Unmarshal(body, &rsp) - if err != nil { - gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err) + if err := json.Unmarshal(body, &rsp); err != nil { return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error") } if rsp.RelayName == "" || rsp.RelayToken == 0 { @@ -216,9 +204,7 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri return nil, 0, "", errors.New("read MsgPushAddRelayTunnelRsp error") } rspID := TunnelMsg{} - err = json.Unmarshal(body, &rspID) - if err != nil { - gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err) + if err = json.Unmarshal(body, &rspID); err != nil { return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error") } return t, rspID.ID, rsp.Mode, err @@ -233,7 +219,7 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error { } // check if app already exist? appExist := false - _, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) + _, ok := pn.apps.Load(config.ID()) if ok { appExist = true } @@ -303,7 +289,8 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error { relayNode: relayNode, relayMode: relayMode, hbTime: time.Now()} - pn.apps.Store(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort), &app) + pn.apps.Store(config.ID(), &app) + gLog.Printf(LvINFO, "%s use tunnel %d", app.config.AppName, app.tunnel.id) if err == nil { go app.listen() } @@ -314,16 +301,37 @@ func (pn *P2PNetwork) DeleteApp(config AppConfig) { gLog.Printf(LvINFO, "DeleteApp %s%d start", config.Protocol, config.SrcPort) defer gLog.Printf(LvINFO, "DeleteApp %s%d end", config.Protocol, config.SrcPort) // close the apps of this config - i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) + i, ok := pn.apps.Load(config.ID()) if ok { app := i.(*p2pApp) - gLog.Printf(LvINFO, "app %s exist, delete it", fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) + gLog.Printf(LvINFO, "app %s exist, delete it", app.config.AppName) app.close() - pn.apps.Delete(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) + pn.apps.Delete(config.ID()) } } -func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, error) { +func (pn *P2PNetwork) findTunnel(config *AppConfig) (t *P2PTunnel) { + // find existing tunnel to peer + pn.allTunnels.Range(func(id, i interface{}) bool { + tmpt := i.(*P2PTunnel) + if tmpt.config.PeerNode == config.PeerNode { + gLog.Println(LvINFO, "tunnel already exist ", config.PeerNode) + isActive := tmpt.checkActive() + // inactive, close it + if !isActive { + gLog.Println(LvINFO, "but it's not active, close it ", config.PeerNode) + tmpt.close() + } else { + t = tmpt + } + return false + } + return true + }) + return t +} + +func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunnel, err error) { gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) isClient := false @@ -332,64 +340,34 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, tid = rand.Uint64() isClient = true } - exist := false - // find existing tunnel to peer - var t *P2PTunnel - pn.allTunnels.Range(func(id, i interface{}) bool { - t = i.(*P2PTunnel) - if t.config.PeerNode == config.PeerNode { - // server side force close existing tunnel - if !isClient { - t.close() - return false - } - // client side checking - gLog.Println(LvINFO, "tunnel already exist ", config.PeerNode) - isActive := t.checkActive() - // inactive, close it - if !isActive { - gLog.Println(LvINFO, "but it's not active, close it ", config.PeerNode) - t.close() - } else { - // active - exist = true - } - return false - } - return true - }) - if exist { + if t = pn.findTunnel(&config); t != nil { return t, nil } // create tunnel if not exist - t = &P2PTunnel{pn: pn, - config: config, - id: tid, - } + pn.msgMapMtx.Lock() pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan []byte, 50) pn.msgMapMtx.Unlock() // server side if !isClient { - err := pn.newTunnel(t, tid, isClient) + t, err = pn.newTunnel(config, tid, isClient) return t, err // always return } // client side // peer info - initErr := t.requestPeerInfo() + initErr := pn.requestPeerInfo(&config) if initErr != nil { gLog.Println(LvERROR, "init error:", initErr) return nil, initErr } - var err error // try TCP6 - if IsIPv6(t.config.peerIPv6) && IsIPv6(t.pn.config.publicIPv6) { + if IsIPv6(config.peerIPv6) && IsIPv6(pn.config.publicIPv6) { gLog.Println(LvINFO, "try TCP6") - t.config.linkMode = LinkModeTCP6 - t.config.isUnderlayServer = 0 - if err = pn.newTunnel(t, tid, isClient); err == nil { + config.linkMode = LinkModeTCP6 + config.isUnderlayServer = 0 + if t, err = pn.newTunnel(config, tid, isClient); err == nil { return t, nil } } @@ -397,59 +375,74 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, // TODO: try UDP6 // try TCP4 - if t.config.hasIPv4 == 1 || t.pn.config.hasIPv4 == 1 || t.config.hasUPNPorNATPMP == 1 || t.pn.config.hasUPNPorNATPMP == 1 { + if config.hasIPv4 == 1 || pn.config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 || pn.config.hasUPNPorNATPMP == 1 { gLog.Println(LvINFO, "try TCP4") - t.config.linkMode = LinkModeTCP4 - if t.config.hasIPv4 == 1 || t.config.hasUPNPorNATPMP == 1 { - t.config.isUnderlayServer = 0 + config.linkMode = LinkModeTCP4 + if config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 { + config.isUnderlayServer = 0 } else { - t.config.isUnderlayServer = 1 + config.isUnderlayServer = 1 } - if err = pn.newTunnel(t, tid, isClient); err == nil { + if t, err = pn.newTunnel(config, tid, isClient); err == nil { return t, nil } } // TODO: try UDP4 // try TCPPunch - if t.config.peerNatType == NATCone && t.pn.config.natType == NATCone { // TODO: support c2s - gLog.Println(LvINFO, "try TCP4 Punch") - t.config.linkMode = LinkModeTCPPunch - t.config.isUnderlayServer = 0 - if err = pn.newTunnel(t, tid, isClient); err == nil { - gLog.Println(LvINFO, "TCP4 Punch ok") - return t, nil + for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries + if config.peerNatType == NATCone && pn.config.natType == NATCone { // TODO: support c2s + gLog.Println(LvINFO, "try TCP4 Punch") + config.linkMode = LinkModeTCPPunch + config.isUnderlayServer = 0 + if t, err = pn.newTunnel(config, tid, isClient); err == nil { + gLog.Println(LvINFO, "TCP4 Punch ok") + return t, nil + } } } + // try UDPPunch - if t.config.peerNatType == NATCone || t.pn.config.natType == NATCone { - gLog.Println(LvINFO, "try UDP4 Punch") - t.config.linkMode = LinkModeUDPPunch - t.config.isUnderlayServer = 0 - if err = pn.newTunnel(t, tid, isClient); err == nil { - return t, nil + for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries + if config.peerNatType == NATCone || pn.config.natType == NATCone { + gLog.Println(LvINFO, "try UDP4 Punch") + config.linkMode = LinkModeUDPPunch + config.isUnderlayServer = 0 + if t, err = pn.newTunnel(config, tid, isClient); err == nil { + return t, nil + } + } + if !(config.peerNatType == NATCone && pn.config.natType == NATCone) { // not cone2cone, no more try + break } } return nil, ErrorHandshake // only ErrorHandshake will try relay } -func (pn *P2PNetwork) newTunnel(t *P2PTunnel, tid uint64, isClient bool) error { +func (pn *P2PNetwork) newTunnel(config AppConfig, tid uint64, isClient bool) (t *P2PTunnel, err error) { + if existTunnel := pn.findTunnel(&config); existTunnel != nil { + return existTunnel, nil + } + t = &P2PTunnel{pn: pn, + config: config, + id: tid, + } t.initPort() if isClient { - if err := t.connect(); err != nil { + if err = t.connect(); err != nil { gLog.Println(LvERROR, "p2pTunnel connect error:", err) - return err + return } } else { - if err := t.listen(); err != nil { + if err = t.listen(); err != nil { gLog.Println(LvERROR, "p2pTunnel listen error:", err) - return err + return } } // store it when success gLog.Printf(LvDEBUG, "store tunnel %d", tid) pn.allTunnels.Store(tid, t) - return nil + return } func (pn *P2PNetwork) init() error { gLog.Println(LvINFO, "init start") @@ -552,9 +545,8 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) { case MsgLogin: // gLog.Println(LevelINFO,string(msg)) rsp := LoginRsp{} - err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp) - if err != nil { - gLog.Printf(LvERROR, "wrong login response:%s", err) + if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(rsp), err) return } if rsp.Error != 0 { @@ -746,3 +738,30 @@ func (pn *P2PNetwork) refreshIPv6(force bool) { } pn.config.publicIPv6 = string(buf[:n]) } + +func (pn *P2PNetwork) requestPeerInfo(config *AppConfig) error { + // request peer info + pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{config.peerToken, config.PeerNode}) + head, body := pn.read("", MsgQuery, MsgQueryPeerInfoRsp, UnderlayConnectTimeout) + if head == nil { + return ErrNetwork // network error, should not be ErrPeerOffline + } + rsp := QueryPeerInfoRsp{} + if err := json.Unmarshal(body, &rsp); err != nil { + return ErrMsgFormat + } + if rsp.Online == 0 { + return ErrPeerOffline + } + if compareVersion(rsp.Version, LeastSupportVersion) == LESS { + return ErrVersionNotCompatible + } + config.peerVersion = rsp.Version + config.hasIPv4 = rsp.HasIPv4 + config.peerIP = rsp.IPv4 + config.peerIPv6 = rsp.IPv6 + config.hasUPNPorNATPMP = rsp.HasUPNPorNATPMP + config.peerNatType = rsp.NatType + /// + return nil +} diff --git a/core/p2ptunnel.go b/core/p2ptunnel.go index 640c532..912009f 100644 --- a/core/p2ptunnel.go +++ b/core/p2ptunnel.go @@ -8,6 +8,7 @@ import ( "fmt" "math/rand" "net" + "reflect" "sync" "time" ) @@ -32,34 +33,6 @@ type P2PTunnel struct { punchTs uint64 } -func (t *P2PTunnel) requestPeerInfo() error { - // request peer info - t.pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{t.config.peerToken, t.config.PeerNode}) - head, body := t.pn.read("", MsgQuery, MsgQueryPeerInfoRsp, UnderlayConnectTimeout) - if head == nil { - return ErrNetwork // network error, should not be ErrPeerOffline - } - rsp := QueryPeerInfoRsp{} - err := json.Unmarshal(body, &rsp) - if err != nil { - gLog.Printf(LvERROR, "wrong QueryPeerInfoRsp:%s", err) - return ErrMsgFormat - } - if rsp.Online == 0 { - return ErrPeerOffline - } - if compareVersion(rsp.Version, LeastSupportVersion) == LESS { - return ErrVersionNotCompatible - } - t.config.peerVersion = rsp.Version - t.config.hasIPv4 = rsp.HasIPv4 - t.config.peerIP = rsp.IPv4 - t.config.peerIPv6 = rsp.IPv6 - t.config.hasUPNPorNATPMP = rsp.HasUPNPorNATPMP - t.config.peerNatType = rsp.NatType - /// - return nil -} func (t *P2PTunnel) initPort() { t.running = true t.hbMtx.Lock() @@ -118,9 +91,8 @@ func (t *P2PTunnel) connect() error { return errors.New("connect error") } rsp := PushConnectRsp{} - err := json.Unmarshal(body, &rsp) - if err != nil { - gLog.Printf(LvERROR, "wrong MsgPushConnectRsp:%s", err) + if err := json.Unmarshal(body, &rsp); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(rsp), err) return err } // gLog.Println(LevelINFO, rsp) @@ -135,7 +107,7 @@ func (t *P2PTunnel) connect() error { t.config.peerConeNatPort = rsp.ConeNatPort t.config.peerIP = rsp.FromIP t.punchTs = rsp.PunchTs - err = t.start() + err := t.start() if err != nil { gLog.Println(LvERROR, "handshake error:", err) err = ErrorHandshake @@ -165,14 +137,10 @@ func (t *P2PTunnel) isActive() bool { } func (t *P2PTunnel) checkActive() bool { - hbt := time.Now() - t.hbMtx.Lock() - if t.hbTime.Before(time.Now().Add(-TunnelHeartbeatTime)) { - t.hbMtx.Unlock() + if t.conn == nil { return false } - t.hbMtx.Unlock() - // hbtime within TunnelHeartbeatTime, check it now + hbt := time.Now() t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeat, nil) isActive := false // wait at most 5s @@ -184,6 +152,7 @@ func (t *P2PTunnel) checkActive() bool { t.hbMtx.Unlock() time.Sleep(time.Millisecond * 100) } + gLog.Printf(LvINFO, "checkActive %t. hbtime=%d", isActive, t.hbTime) return isActive } @@ -451,6 +420,7 @@ func (t *P2PTunnel) readLoop() { } switch head.SubType { case MsgTunnelHeartbeat: + t.hbTime = time.Now() t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeatAck, nil) gLog.Printf(LvDEBUG, "%d read tunnel heartbeat", t.id) case MsgTunnelHeartbeatAck: @@ -492,9 +462,8 @@ func (t *P2PTunnel) readLoop() { t.pn.relay(tunnelID, body[8:]) case MsgRelayHeartbeat: req := RelayHeartbeat{} - err := json.Unmarshal(body, &req) - if err != nil { - gLog.Printf(LvERROR, "wrong RelayHeartbeat:%s", err) + if err := json.Unmarshal(body, &req); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err) continue } gLog.Printf(LvDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID) @@ -514,9 +483,8 @@ func (t *P2PTunnel) readLoop() { t.pn.updateAppHeartbeat(req.AppID) case MsgOverlayConnectReq: req := OverlayConnectReq{} - err := json.Unmarshal(body, &req) - if err != nil { - gLog.Printf(LvERROR, "wrong MsgOverlayConnectReq:%s", err) + if err := json.Unmarshal(body, &req); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err) continue } // app connect only accept token(not relay totp token), avoid someone using the share relay node's token @@ -559,9 +527,8 @@ func (t *P2PTunnel) readLoop() { go oConn.run() case MsgOverlayDisconnectReq: req := OverlayDisconnectReq{} - err := json.Unmarshal(body, &req) - if err != nil { - gLog.Printf(LvERROR, "wrong OverlayDisconnectRequest:%s", err) + if err := json.Unmarshal(body, &req); err != nil { + gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err) continue } overlayID := req.ID diff --git a/core/protocol.go b/core/protocol.go index 3e9221f..f78baca 100644 --- a/core/protocol.go +++ b/core/protocol.go @@ -10,7 +10,7 @@ import ( "time" ) -const OpenP2PVersion = "3.9.1" +const OpenP2PVersion = "3.9.11" const ProductName string = "openp2p" const LeastSupportVersion = "3.0.0" const SyncServerTimeVersion = "3.9.0" @@ -135,17 +135,18 @@ const ( const ( ReadBuffLen = 4096 // for UDP maybe not enough NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow - TunnelHeartbeatTime = time.Second * 15 + TunnelHeartbeatTime = time.Second * 10 // some nat udp session expired time less than 15s. change to 10s TunnelIdleTimeout = time.Minute SymmetricHandshakeNum = 800 // 0.992379 // SymmetricHandshakeNum = 1000 // 0.999510 SymmetricHandshakeInterval = time.Millisecond HandshakeTimeout = time.Second * 5 - PeerAddRelayTimeount = HandshakeTimeout * 2 + PeerAddRelayTimeount = time.Second * 30 // peer need times CheckActiveTimeout = time.Second * 5 PaddingSize = 16 AESKeySize = 16 MaxRetry = 10 + Cone2ConePunchMaxRetry = 3 RetryInterval = time.Second * 30 PublicIPEchoTimeout = time.Second * 1 NatTestTimeout = time.Second * 5 diff --git a/core/update.go b/core/update.go index d1c1f22..bb64b17 100644 --- a/core/update.go +++ b/core/update.go @@ -43,8 +43,7 @@ func update(host string, port int) { return } updateInfo := UpdateInfo{} - err = json.Unmarshal(rspBuf, &updateInfo) - if err != nil { + if err = json.Unmarshal(rspBuf, &updateInfo); err != nil { gLog.Println(LvERROR, rspBuf, " update info decode error:", err) return }