diff --git a/core/p2pnetwork.go b/core/p2pnetwork.go index 0f0865a..1132e3d 100644 --- a/core/p2pnetwork.go +++ b/core/p2pnetwork.go @@ -149,13 +149,17 @@ func (pn *P2PNetwork) keepAlive() { time.Sleep(time.Second * 10) // Skip check if we're waking from sleep/hibernation now := time.Now() - if !lastCheckTime.IsZero() && now.Sub(lastCheckTime) > time.Minute*2 { + if !lastCheckTime.IsZero() && now.Sub(lastCheckTime) > NetworkHeartbeatTime*3 { gLog.i("Detected possible sleep/wake cycle, skipping this check") lastCheckTime = now continue } lastCheckTime = now - if pn.hbTime.Before(time.Now().Add(-3*time.Minute)) && pn.initTime.Before(time.Now().Add(-3*time.Minute)) { + if pn.hbTime.Before(time.Now().Add(-NetworkHeartbeatTime * 3)) { + if pn.initTime.After(time.Now().Add(-NetworkHeartbeatTime * 3)) { + gLog.d("Init less than 3 mins, skipping this check") + continue + } gLog.e("P2PNetwork keepAlive error, exit worker") dumpStack() os.Exit(9) @@ -568,7 +572,7 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne var primaryPunchFunc func() (*P2PTunnel, error) var secondaryPunchFunc func() (*P2PTunnel, error) funcUDP := func() (t *P2PTunnel, err error) { - if thisTunnelForcev6 && config.PunchPriority&PunchPriorityTCPOnly != 0 { + if thisTunnelForcev6 || config.PunchPriority&PunchPriorityTCPOnly != 0 { return } // try UDPPunch @@ -588,7 +592,7 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne return } funcTCP := func() (t *P2PTunnel, err error) { - if thisTunnelForcev6 && config.PunchPriority&PunchPriorityUDPOnly != 0 { + if thisTunnelForcev6 || config.PunchPriority&PunchPriorityUDPOnly != 0 { return } // try TCPPunch @@ -661,6 +665,13 @@ func (pn *P2PNetwork) init() error { pn.wgReconnect.Add(1) defer pn.wgReconnect.Done() var err error + defer func() { + if err != nil { + // init failed, retry + pn.close(true) + gLog.e("P2PNetwork init error:%s", err) + } + }() ips, err := resolveServerIP(gConf.Network.ServerHost) if err != nil { gLog.e("resolve dns failed: %v", err) @@ -791,11 +802,7 @@ func (pn *P2PNetwork) init() error { gLog.d("P2PNetwork init ok") break } - if err != nil { - // init failed, retry - pn.close(true) - gLog.e("P2PNetwork init error:%s", err) - } + return err } @@ -877,23 +884,48 @@ func (pn *P2PNetwork) readLoop() { gLog.d("P2PNetwork readLoop start") pn.wgReconnect.Add(1) defer pn.wgReconnect.Done() - for pn.running { - pn.conn.SetReadDeadline(time.Now().Add(NetworkHeartbeatTime + 10*time.Second)) - _, msg, err := pn.conn.ReadMessage() - if err != nil { - gLog.e("P2PNetwork read error:%s", err) - if closeErr, ok := err.(*websocket.CloseError); ok { - if closeErr.Code == 1006 { - pn.close(true) - } else { - pn.close(false) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 使用带超时的 goroutine 读取 + readChan := make(chan []byte, 10) + + go func() { + for { + select { + case <-ctx.Done(): + return + default: + pn.conn.SetReadDeadline(time.Now().Add(NetworkHeartbeatTime + 10*time.Second)) + _, msg, err := pn.conn.ReadMessage() + if err != nil { + gLog.e("ReadMessage error:%s", err) + readChan <- nil + return } - } else { - pn.close(false) + readChan <- msg } - break } - pn.handleMessage(msg) + }() + + readTimeout := 60 * time.Second + + for pn.running { + select { + case result := <-readChan: + if result == nil { + // 处理错误 + pn.close(false) + return + } + pn.handleMessage(result) + + case <-time.After(readTimeout): + gLog.e("ReadMessage timeout after %v", readTimeout) + cancel() + pn.close(false) + return + } } gLog.d("P2PNetwork readLoop end") }