From 471aa5e6ea68626a1606da8d00807e9e43167d44 Mon Sep 17 00:00:00 2001 From: TenderIronh Date: Tue, 18 Nov 2025 17:07:24 +0800 Subject: [PATCH] intranet support udp --- core/handlepush.go | 130 +++++++++++++++++++++----------------- core/p2ptunnel.go | 2 +- core/underlay.go | 3 + core/underlay_tcp.go | 81 +++++++++++------------- core/underlay_tcp_test.go | 21 ++++++ core/v4listener.go | 128 +++++++++++++++++++++++++++---------- 6 files changed, 229 insertions(+), 136 deletions(-) create mode 100644 core/underlay_tcp_test.go diff --git a/core/handlepush.go b/core/handlepush.go index 52df4ca..36d18ad 100644 --- a/core/handlepush.go +++ b/core/handlepush.go @@ -10,6 +10,7 @@ import ( "path/filepath" "reflect" "runtime" + "runtime/pprof" "time" "github.com/openp2p-cn/totp" @@ -21,58 +22,60 @@ func handlePush(subType uint16, msg []byte) error { if err != nil { return err } - gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead) + // gLog.d("handle push msg type:%d, push header:%+v", subType, pushHead) switch subType { case MsgPushConnectReq: err = handleConnectReq(msg) case MsgPushRsp: rsp := PushRsp{} if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil { - gLog.Printf(LvERROR, "Unmarshal pushRsp:%s", err) + gLog.e("Unmarshal pushRsp:%s", err) return err } if rsp.Error == 0 { - gLog.Printf(LvDEBUG, "push ok, detail:%s", rsp.Detail) + gLog.dev("push ok, detail:%s", rsp.Detail) } else { - gLog.Printf(LvERROR, "push error:%d, detail:%s", rsp.Error, rsp.Detail) + gLog.e("push error:%d, detail:%s", rsp.Error, rsp.Detail) } case MsgPushAddRelayTunnelReq: req := AddRelayTunnelReq{} if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil { - gLog.Printf(LvERROR, "Unmarshal %v:%s", reflect.TypeOf(req), err) + gLog.e("Unmarshal %v:%s", reflect.TypeOf(req), err) return err } config := AppConfig{} config.PeerNode = req.RelayName config.peerToken = req.RelayToken config.relayMode = req.RelayMode + config.PunchPriority = req.PunchPriority + config.UnderlayProtocol = req.UnderlayProtocol go func(r AddRelayTunnelReq) { t, errDt := GNetwork.addDirectTunnel(config, 0) - if errDt == nil { + if errDt == nil && t != nil { // notify peer relay ready msg := TunnelMsg{ID: t.id} GNetwork.push(r.From, MsgPushAddRelayTunnelRsp, msg) appConfig := config appConfig.PeerNode = req.From } else { - gLog.Printf(LvERROR, "addDirectTunnel error:%s", errDt) + gLog.w("addDirectTunnel error:%s", errDt) GNetwork.push(r.From, MsgPushAddRelayTunnelRsp, "error") // compatible with old version client, trigger unmarshal error } }(req) case MsgPushServerSideSaveMemApp: req := ServerSideSaveMemApp{} if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil { - gLog.Printf(LvERROR, "Unmarshal %v:%s", reflect.TypeOf(req), err) + gLog.e("Unmarshal %v:%s", reflect.TypeOf(req), err) return err } gLog.Println(LvDEBUG, "handle MsgPushServerSideSaveMemApp:", prettyJson(req)) var existTunnel *P2PTunnel i, ok := GNetwork.allTunnels.Load(req.TunnelID) if !ok { - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 3000) i, ok = GNetwork.allTunnels.Load(req.TunnelID) // retry sometimes will receive MsgPushServerSideSaveMemApp but p2ptunnel not store yet. if !ok { - gLog.Println(LvERROR, "handle MsgPushServerSideSaveMemApp error:", ErrMemAppTunnelNotFound) + gLog.e("handle MsgPushServerSideSaveMemApp error:%s", ErrMemAppTunnelNotFound) return ErrMemAppTunnelNotFound } } @@ -91,10 +94,10 @@ func handlePush(subType uint16, msg []byte) error { } else { app.setRelayTunnel(existTunnel) } - gLog.Println(LvDEBUG, "found existing memapp, update it") + gLog.d("found existing memapp, update it") } else { appConfig := existTunnel.config - appConfig.SrcPort = 0 + appConfig.SrcPort = int(req.SrcPort) appConfig.Protocol = "" appConfig.AppName = fmt.Sprintf("%d", peerID) appConfig.PeerNode = req.From @@ -118,22 +121,15 @@ func handlePush(subType uint16, msg []byte) error { } return nil - case MsgPushAPPKey: - req := APPKeySync{} - if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil { - gLog.Printf(LvERROR, "Unmarshal %v:%s", reflect.TypeOf(req), err) - return err - } - SaveKey(req.AppID, req.AppKey) case MsgPushUpdate: - gLog.Println(LvINFO, "MsgPushUpdate") + gLog.i("MsgPushUpdate") err := update(gConf.Network.ServerHost, gConf.Network.ServerPort) if err == nil { - os.Exit(0) + os.Exit(9) // 9 tell daemon this exit because of update } return err case MsgPushRestart: - gLog.Println(LvINFO, "MsgPushRestart") + gLog.i("MsgPushRestart") os.Exit(0) return err case MsgPushReportApps: @@ -144,29 +140,31 @@ func handlePush(subType uint16, msg []byte) error { err = handleLog(msg) case MsgPushReportGoroutine: err = handleReportGoroutine() + case MsgPushReportHeap: + err = handleReportHeap() case MsgPushCheckRemoteService: err = handleCheckRemoteService(msg) case MsgPushEditApp: err = handleEditApp(msg) case MsgPushEditNode: - gLog.Println(LvINFO, "MsgPushEditNode") + gLog.i("MsgPushEditNode") req := EditNode{} if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil { - gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:])) + gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:])) return err } gConf.setNode(req.NewName) gConf.setShareBandwidth(req.Bandwidth) os.Exit(0) case MsgPushSwitchApp: - gLog.Println(LvINFO, "MsgPushSwitchApp") + gLog.i("MsgPushSwitchApp") app := AppInfo{} if err = json.Unmarshal(msg[openP2PHeaderSize:], &app); err != nil { - gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(app), err, string(msg[openP2PHeaderSize:])) + gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(app), err, string(msg[openP2PHeaderSize:])) return err } - config := AppConfig{Enabled: app.Enabled, SrcPort: app.SrcPort, Protocol: app.Protocol} - gLog.Println(LvINFO, app.AppName, " switch to ", app.Enabled) + config := AppConfig{PeerNode: app.PeerNode, Enabled: app.Enabled, SrcPort: app.SrcPort, Protocol: app.Protocol} + gLog.i("%s switch to %d", app.AppName, app.Enabled) gConf.switchApp(config, app.Enabled) if app.Enabled == 0 { // disable APP @@ -175,10 +173,10 @@ func handlePush(subType uint16, msg []byte) error { case MsgPushDstNodeOnline: req := PushDstNodeOnline{} if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil { - gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:])) + gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:])) return err } - gLog.Printf(LvINFO, "%s online, retryApp", req.Node) + gLog.i("%s online, retryApp", req.Node) gConf.retryApp(req.Node) default: i, ok := GNetwork.msgMap.Load(pushHead.From) @@ -192,10 +190,10 @@ func handlePush(subType uint16, msg []byte) error { } func handleEditApp(msg []byte) (err error) { - gLog.Println(LvINFO, "MsgPushEditApp") + gLog.i("MsgPushEditApp") newApp := AppInfo{} if err = json.Unmarshal(msg[openP2PHeaderSize:], &newApp); err != nil { - gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(newApp), err, string(msg[openP2PHeaderSize:])) + gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(newApp), err, string(msg[openP2PHeaderSize:])) return err } oldConf := AppConfig{Enabled: 1} @@ -211,13 +209,16 @@ func handleEditApp(msg []byte) (err error) { gConf.delete(oldConf) } - // AddApp - newConf := oldConf - newConf.Protocol = newApp.Protocol - newConf.SrcPort = newApp.SrcPort - newConf.RelayNode = newApp.SpecRelayNode - newConf.PunchPriority = newApp.PunchPriority - gConf.add(newConf, false) + if newApp.SrcPort != 0 { // delete app + // AddApp + newConf := oldConf + newConf.Protocol = newApp.Protocol + newConf.SrcPort = newApp.SrcPort + newConf.RelayNode = newApp.SpecRelayNode + newConf.PunchPriority = newApp.PunchPriority + gConf.add(newConf, false) + } + if newApp.Protocol0 != "" && newApp.SrcPort0 != 0 { // not edit GNetwork.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end } @@ -227,12 +228,12 @@ func handleEditApp(msg []byte) (err error) { func handleConnectReq(msg []byte) (err error) { req := PushConnectReq{} if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil { - gLog.Printf(LvERROR, "Unmarshal %v:%s", reflect.TypeOf(req), err) + gLog.e("Unmarshal %v:%s", reflect.TypeOf(req), err) return err } - gLog.Printf(LvDEBUG, "%s is connecting... push connect response", req.From) + gLog.d("%s is connecting... push connect response", req.From) if compareVersion(req.Version, LeastSupportVersion) < 0 { - gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From) + gLog.e("%s:%s", ErrVersionNotCompatible.Error(), req.From) rsp := PushConnectRsp{ Error: 10, Detail: ErrVersionNotCompatible.Error(), @@ -245,7 +246,7 @@ func handleConnectReq(msg []byte) (err error) { // verify totp token or token t := totp.TOTP{Step: totp.RelayTOTPStep} if t.Verify(req.Token, gConf.Network.Token, time.Now().Unix()-GNetwork.dt/int64(time.Second)) { // localTs may behind, auto adjust ts - gLog.Printf(LvINFO, "handleConnectReq Access Granted") + gLog.d("handleConnectReq Access Granted") config := AppConfig{} config.peerNatType = req.NatType config.peerConeNatPort = req.ConeNatPort @@ -261,7 +262,7 @@ func handleConnectReq(msg []byte) (err error) { config.UnderlayProtocol = req.UnderlayProtocol // share relay node will limit bandwidth if req.Token != gConf.Network.Token { - gLog.Printf(LvINFO, "set share bandwidth %d mbps", gConf.Network.ShareBandwidth) + gLog.i("set share bandwidth %d mbps", gConf.Network.ShareBandwidth) config.shareBandwidth = gConf.Network.ShareBandwidth } // go GNetwork.AddTunnel(config, req.ID) @@ -270,7 +271,7 @@ func handleConnectReq(msg []byte) (err error) { }() return nil } - gLog.Println(LvERROR, "handleConnectReq Access Denied:", req.From) + gLog.e("handleConnectReq Access Denied:%s", req.From) rsp := PushConnectRsp{ Error: 1, Detail: fmt.Sprintf("connect to %s error: Access Denied", gConf.Network.Node), @@ -281,10 +282,10 @@ func handleConnectReq(msg []byte) (err error) { } func handleReportApps() (err error) { - gLog.Println(LvINFO, "MsgPushReportApps") + gLog.i("MsgPushReportApps") req := ReportApps{} - gConf.mtx.Lock() - defer gConf.mtx.Unlock() + gConf.mtx.RLock() + defer gConf.mtx.RUnlock() for _, config := range gConf.Apps { appActive := 0 @@ -346,10 +347,8 @@ func handleReportApps() (err error) { } func handleReportMemApps() (err error) { - gLog.Println(LvINFO, "handleReportMemApps") + gLog.i("handleReportMemApps") req := ReportApps{} - gConf.mtx.Lock() - defer gConf.mtx.Unlock() GNetwork.sdwan.sysRoute.Range(func(key, value interface{}) bool { node := value.(*sdwanNode) appActive := 0 @@ -405,12 +404,12 @@ func handleReportMemApps() (err error) { } func handleLog(msg []byte) (err error) { - gLog.Println(LvDEBUG, "MsgPushReportLog") + gLog.d("MsgPushReportLog") const defaultLen = 1024 * 128 const maxLen = 1024 * 1024 req := ReportLogReq{} if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil { - gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:])) + gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:])) return err } if req.FileName == "" { @@ -418,9 +417,12 @@ func handleLog(msg []byte) (err error) { } else { req.FileName = sanitizeFileName(req.FileName) } + if req.IsSetLogLevel == 1 { + gLog.setLevel(LogLevel(req.LogLevel)) + } f, err := os.Open(filepath.Join("log", req.FileName)) if err != nil { - gLog.Println(LvERROR, "read log file error:", err) + gLog.e("read log file error:%s", err) return err } fi, err := f.Stat() @@ -443,7 +445,7 @@ func handleLog(msg []byte) (err error) { readLength, err := f.Read(buff) f.Close() if err != nil { - gLog.Println(LvERROR, "read log content error:", err) + gLog.e("read log content error:%s", err) return err } rsp := ReportLogRsp{} @@ -455,17 +457,27 @@ func handleLog(msg []byte) (err error) { } func handleReportGoroutine() (err error) { - gLog.Println(LvDEBUG, "handleReportGoroutine") + gLog.d("handleReportGoroutine") buf := make([]byte, 1024*128) stackLen := runtime.Stack(buf, true) - return GNetwork.write(MsgReport, MsgPushReportLog, string(buf[:stackLen])) + return GNetwork.write(MsgReport, MsgReportResponse, string(buf[:stackLen])) +} + +func handleReportHeap() error { + var buf bytes.Buffer + err := pprof.Lookup("heap").WriteTo(&buf, 1) + if err != nil { + return err + } + + return GNetwork.write(MsgReport, MsgReportResponse, buf.String()) } func handleCheckRemoteService(msg []byte) (err error) { - gLog.Println(LvDEBUG, "handleCheckRemoteService") + gLog.d("handleCheckRemoteService") req := CheckRemoteService{} if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil { - gLog.Printf(LvERROR, "Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:])) + gLog.e("Unmarshal %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:])) return err } rsp := PushRsp{Error: 0} diff --git a/core/p2ptunnel.go b/core/p2ptunnel.go index a9896a0..c82d295 100644 --- a/core/p2ptunnel.go +++ b/core/p2ptunnel.go @@ -331,7 +331,7 @@ func (t *P2PTunnel) connectUnderlayUDP() (c underlay, err error) { func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) { gLog.Printf(LvDEBUG, "connectUnderlayTCP %s start ", t.config.LogPeerNode()) defer gLog.Printf(LvDEBUG, "connectUnderlayTCP %s end ", t.config.LogPeerNode()) - var ul *underlayTCP + var ul underlay peerIP := t.config.peerIP if t.config.linkMode == LinkModeIntranet { peerIP = t.config.peerLanIP diff --git a/core/underlay.go b/core/underlay.go index 0c0a42f..f889d17 100644 --- a/core/underlay.go +++ b/core/underlay.go @@ -2,6 +2,7 @@ package openp2p import ( "io" + "net" "time" ) @@ -18,6 +19,7 @@ type underlay interface { SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error Protocol() string + RemoteAddr() net.Addr } func DefaultReadBuffer(ul underlay) (*openP2PHeader, []byte, error) { @@ -28,6 +30,7 @@ func DefaultReadBuffer(ul underlay) (*openP2PHeader, []byte, error) { } head, err := decodeHeader(headBuf) if err != nil || head.MainType > 16 { + gLog.d("DefaultReadBuffer error:%v, %d", err, head.MainType) return nil, nil, err } dataBuf := make([]byte, head.DataLen) diff --git a/core/underlay_tcp.go b/core/underlay_tcp.go index c4a6eb2..de5f0df 100644 --- a/core/underlay_tcp.go +++ b/core/underlay_tcp.go @@ -46,19 +46,25 @@ func (conn *underlayTCP) WUnlock() { conn.writeMtx.Unlock() } -func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) (*underlayTCP, error) { - if mode == LinkModeTCPPunch { +func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) (underlay, error) { + if mode == LinkModeTCPPunch || mode == LinkModeTCP6 { if compareVersion(t.config.peerVersion, SyncServerTimeVersion) < 0 { - gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) + gLog.d("peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) } else { ts := time.Duration(int64(t.punchTs) + GNetwork.dt - time.Now().UnixNano()) - gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond) + gLog.d("sleep %d ms", ts/time.Millisecond) time.Sleep(ts) } - gLog.Println(LvDEBUG, " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port)) - c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) + // gLog.d(" send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port)) + var c net.Conn + var err error + if mode == LinkModeTCPPunch { + c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) + } else { + c, err = reuse.DialTimeout("tcp6", fmt.Sprintf("[::]:%d", localPort), fmt.Sprintf("[%s]:%d", t.config.peerIPv6, port), CheckActiveTimeout) + } if err != nil { - gLog.Println(LvDEBUG, "send tcp punch: ", err) + // gLog.d("send tcp punch: ", err) return nil, err } utcp := &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c} @@ -67,7 +73,7 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) return nil, fmt.Errorf("read start msg error:%s", err) } if buff != nil { - gLog.Println(LvDEBUG, string(buff)) + gLog.d("handshake flag:%s", string(buff)) } utcp.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff) return utcp, nil @@ -77,59 +83,46 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) if compareVersion(t.config.peerVersion, PublicIPVersion) < 0 { // old version ipBytes := net.ParseIP(t.config.peerIP).To4() tid = uint64(binary.BigEndian.Uint32(ipBytes)) - gLog.Println(LvDEBUG, "compatible with old client, use ip as key:", tid) + gLog.d("compatible with old client, use ip as key:%d", tid) } - var utcp *underlayTCP - if mode == LinkModeIntranet && gConf.Network.hasIPv4 == 0 && gConf.Network.hasUPNPorNATPMP == 0 { - addr, _ := net.ResolveTCPAddr("tcp4", fmt.Sprintf("0.0.0.0:%d", localPort)) - l, err := net.ListenTCP("tcp4", addr) - if err != nil { - gLog.Printf(LvERROR, "listen %d error:", localPort, err) - return nil, err - } - defer l.Close() - err = l.SetDeadline(time.Now().Add(UnderlayTCPConnectTimeout)) - if err != nil { - gLog.Printf(LvERROR, "set listen timeout:", err) - return nil, err - } - c, err := l.Accept() - if err != nil { - return nil, err - } - utcp = &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c} - } else { - if v4l != nil { - utcp = v4l.getUnderlayTCP(tid) - } + var ul underlay + if v4l != nil { + ul = v4l.getUnderlay(tid) } - - if utcp == nil { + if ul == nil { return nil, ErrConnectPublicV4 } - return utcp, nil + return ul, nil } func dialTCP(host string, port int, localPort int, mode string) (*underlayTCP, error) { var c net.Conn var err error - if mode == LinkModeTCPPunch { - gLog.Println(LvDev, " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port)) - if c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout); err != nil { - gLog.Println(LvDev, "send tcp punch: ", err) - } + network := "tcp" + localAddr := fmt.Sprintf("0.0.0.0:%d", localPort) + remoteAddr := fmt.Sprintf("%s:%d", host, port) + if mode == LinkModeTCP6 { // address need [ip] + network = "tcp6" + localAddr = fmt.Sprintf("[::]:%d", localPort) + remoteAddr = fmt.Sprintf("[%s]:%d", host, port) + } + if mode == LinkModeTCP4 || mode == LinkModeIntranet { // random port + localAddr = fmt.Sprintf("0.0.0.0:%d", 0) + } + gLog.dev("send tcp punch: %s --> %s", localAddr, remoteAddr) - } else { - c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) + c, err = reuse.DialTimeout(network, localAddr, remoteAddr, CheckActiveTimeout) + if err != nil { + gLog.dev("send tcp punch: %v", err) } if err != nil { - gLog.Printf(LvDev, "Dial %s:%d error:%s", host, port, err) + gLog.dev("Dial %s:%d error:%s", host, port, err) return nil, err } tc := c.(*net.TCPConn) tc.SetKeepAlive(true) tc.SetKeepAlivePeriod(UnderlayTCPKeepalive) - gLog.Printf(LvDEBUG, "Dial %s:%d OK", host, port) + gLog.d("Dial %s:%d OK", host, port) return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil } diff --git a/core/underlay_tcp_test.go b/core/underlay_tcp_test.go new file mode 100644 index 0000000..0b40d39 --- /dev/null +++ b/core/underlay_tcp_test.go @@ -0,0 +1,21 @@ +package openp2p + +import ( + "os" + "path/filepath" + "testing" +) + +func TestDialTCP(t *testing.T) { + baseDir := filepath.Dir(os.Args[0]) + os.Chdir(baseDir) // for system service + gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFile|LogConsole) + // ul, err := dialTCP("[240e:3b1:6f6:d14:1c0b:9605:554d:351c]", 3389, 0, LinkModeTCP6) + // if err != nil || ul == nil { + // t.Error("dialTCP error:", err) + // } + ul, err := dialTCP("192.168.3.9", 3389, 0, LinkModeTCP6) + if err != nil || ul == nil { + t.Error("dialTCP error:", err) + } +} diff --git a/core/v4listener.go b/core/v4listener.go index 7931908..87f7cbd 100644 --- a/core/v4listener.go +++ b/core/v4listener.go @@ -1,68 +1,130 @@ package openp2p import ( + "context" "encoding/binary" "fmt" "net" "sync" "time" + + "github.com/quic-go/quic-go" ) type v4Listener struct { - conns sync.Map - port int - acceptCh chan bool + conns sync.Map + port int + acceptCh chan bool + running bool + tcpListener *net.TCPListener + udpListener quic.Listener + wg sync.WaitGroup } -func (vl *v4Listener) start() error { +func (vl *v4Listener) start() { + vl.running = true v4l.acceptCh = make(chan bool, 500) - for { - vl.listen() - time.Sleep(UnderlayTCPConnectTimeout) - } + vl.wg.Add(1) + go func() { + defer vl.wg.Done() + for vl.running { + vl.listenTCP() + time.Sleep(UnderlayTCPConnectTimeout) + } + }() + vl.wg.Add(1) + go func() { + defer vl.wg.Done() + for vl.running { + vl.listenUDP() + time.Sleep(UnderlayTCPConnectTimeout) + } + }() } -func (vl *v4Listener) listen() error { - gLog.Printf(LvINFO, "v4Listener listen %d start", vl.port) - defer gLog.Printf(LvINFO, "v4Listener listen %d end", vl.port) - addr, _ := net.ResolveTCPAddr("tcp4", fmt.Sprintf("0.0.0.0:%d", vl.port)) - l, err := net.ListenTCP("tcp4", addr) +func (vl *v4Listener) stop() { + vl.running = false + if vl.tcpListener != nil { + vl.tcpListener.Close() + } + if vl.udpListener != nil { + vl.udpListener.Close() + } + vl.wg.Wait() +} + +func (vl *v4Listener) listenTCP() error { + gLog.d("v4Listener listenTCP %d start", vl.port) + defer gLog.d("v4Listener listenTCP %d end", vl.port) + addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", vl.port)) // system will auto listen both v4 and v6 + var err error + vl.tcpListener, err = net.ListenTCP("tcp", addr) if err != nil { - gLog.Printf(LvERROR, "v4Listener listen %d error:", vl.port, err) + gLog.e("v4Listener listen %d error:", vl.port, err) return err } - defer l.Close() + defer vl.tcpListener.Close() for { - c, err := l.Accept() + c, err := vl.tcpListener.Accept() if err != nil { break } - go vl.handleConnection(c) + utcp := &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c, connectTime: time.Now()} + go vl.handleConnection(utcp) } + vl.tcpListener = nil return nil } -func (vl *v4Listener) handleConnection(c net.Conn) { - gLog.Println(LvDEBUG, "v4Listener accept connection: ", c.RemoteAddr().String()) - utcp := &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c, connectTime: time.Now()} - utcp.SetReadDeadline(time.Now().Add(UnderlayTCPConnectTimeout)) - _, buff, err := utcp.ReadBuffer() + +func (vl *v4Listener) listenUDP() error { + gLog.d("v4Listener listenUDP %d start", vl.port) + defer gLog.d("v4Listener listenUDP %d end", vl.port) + var err error + vl.udpListener, err = quic.ListenAddr(fmt.Sprintf("0.0.0.0:%d", vl.port), generateTLSConfig(), + &quic.Config{Versions: quicVersion, MaxIdleTimeout: TunnelIdleTimeout, DisablePathMTUDiscovery: true}) if err != nil { - gLog.Println(LvERROR, "utcp.ReadBuffer error:", err) + return err } - utcp.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff) + ctx, cancel := context.WithTimeout(context.Background(), UnderlayConnectTimeout) + defer cancel() + defer vl.udpListener.Close() + for { + sess, err := vl.udpListener.Accept(context.Background()) + if err != nil { + break + } + stream, err := sess.AcceptStream(ctx) + if err != nil { + break + } + ul := &underlayQUIC{writeMtx: &sync.Mutex{}, Stream: stream, Connection: sess} + go vl.handleConnection(ul) + } + vl.udpListener = nil + return err +} + +func (vl *v4Listener) handleConnection(ul underlay) { + gLog.d("v4Listener accept connection: %s", ul.RemoteAddr().String()) + ul.SetReadDeadline(time.Now().Add(UnderlayTCPConnectTimeout)) + _, buff, err := ul.ReadBuffer() + if err != nil || buff == nil { + gLog.e("v4Listener read MsgTunnelHandshake error:%s", err) + } + ul.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff) var tid uint64 if string(buff) == "OpenP2P,hello" { // old client // save remoteIP as key - remoteAddr := c.RemoteAddr().(*net.TCPAddr).IP + remoteAddr := ul.RemoteAddr().(*net.TCPAddr).IP ipBytes := remoteAddr.To4() tid = uint64(binary.BigEndian.Uint32(ipBytes)) // bytes not enough for uint64 - gLog.Println(LvDEBUG, "hello ", string(buff)) + gLog.d("hello %s", string(buff)) } else { if len(buff) < 8 { return } tid = binary.LittleEndian.Uint64(buff[:8]) - gLog.Println(LvDEBUG, "hello ", tid) + gLog.d("hello %d", tid) } // clear timeout connection vl.conns.Range(func(idx, i interface{}) bool { @@ -72,20 +134,22 @@ func (vl *v4Listener) handleConnection(c net.Conn) { } return true }) - vl.conns.Store(tid, utcp) - if len(vl.acceptCh) == 0 { - vl.acceptCh <- true + vl.conns.Store(tid, ul) + select { + case vl.acceptCh <- true: + default: + gLog.e("msgQueue full, drop it") } } -func (vl *v4Listener) getUnderlayTCP(tid uint64) *underlayTCP { +func (vl *v4Listener) getUnderlay(tid uint64) underlay { for i := 0; i < 100; i++ { select { case <-time.After(time.Millisecond * 50): case <-vl.acceptCh: } if u, ok := vl.conns.LoadAndDelete(tid); ok { - return u.(*underlayTCP) + return u.(underlay) } } return nil