diff --git a/daemon.go b/daemon.go index 5ce94a9..d8df4ed 100644 --- a/daemon.go +++ b/daemon.go @@ -115,6 +115,17 @@ func install() { gLog.Println(LevelINFO, "install start") defer gLog.Println(LevelINFO, "install end") // auto uninstall + err := os.MkdirAll(defaultInstallPath, 0775) + + if err != nil { + gLog.Printf(LevelERROR, "MkdirAll %s error:%s", defaultInstallPath, err) + return + } + err = os.Chdir(defaultInstallPath) + if err != nil { + gLog.Println(LevelERROR, "cd error:", err) + return + } uninstall() // save config file @@ -132,19 +143,22 @@ func install() { shareBandwidth := installFlag.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private network no limit") logLevel := installFlag.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error") installFlag.Parse(os.Args[2:]) - if *node != "" && len(*node) < 8 { - gLog.Println(LevelERROR, ErrNodeTooShort) - os.Exit(9) - } - if *node == "" { // if node name not set. use os.Hostname - hostname := defaultNodeName() - node = &hostname - } + gConf.load() // load old config. otherwise will clear all apps gConf.LogLevel = *logLevel gConf.Network.ServerHost = *serverHost gConf.Network.Token = *token - gConf.Network.Node = *node + if *node != "" { + if len(*node) < 8 { + gLog.Println(LevelERROR, ErrNodeTooShort) + os.Exit(9) + } + gConf.Network.Node = *node + } else { + if gConf.Network.Node == "" { // if node name not set. use os.Hostname + gConf.Network.Node = defaultNodeName() + } + } gConf.Network.ServerPort = 27183 gConf.Network.UDPPort1 = 27182 gConf.Network.UDPPort2 = 27183 @@ -159,16 +173,6 @@ func install() { if config.SrcPort != 0 { gConf.add(config, true) } - err := os.MkdirAll(defaultInstallPath, 0775) - if err != nil { - gLog.Printf(LevelERROR, "MkdirAll %s error:%s", defaultInstallPath, err) - return - } - err = os.Chdir(defaultInstallPath) - if err != nil { - gLog.Println(LevelERROR, "cd error:", err) - return - } gConf.save() targetPath := filepath.Join(defaultInstallPath, defaultBinName) d := daemon{} diff --git a/handlepush.go b/handlepush.go index 579c2df..12072a5 100644 --- a/handlepush.go +++ b/handlepush.go @@ -110,7 +110,6 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { case MsgPushReportApps: gLog.Println(LevelINFO, "MsgPushReportApps") req := ReportApps{} - // TODO: add the retrying apps gConf.mtx.Lock() defer gConf.mtx.Unlock() for _, config := range gConf.Apps { diff --git a/log.go b/log.go index 227f510..a5976d6 100644 --- a/log.go +++ b/log.go @@ -110,10 +110,10 @@ func (vl *V8log) checkFile() { for l, logFile := range vl.files { f, e := logFile.Stat() if e != nil { - break + continue } if f.Size() <= vl.maxLogSize { - break + continue } logFile.Close() fname := f.Name() diff --git a/openp2p.go b/openp2p.go index 6c4524e..338a252 100644 --- a/openp2p.go +++ b/openp2p.go @@ -13,7 +13,6 @@ func main() { binDir := filepath.Dir(os.Args[0]) os.Chdir(binDir) // for system service gLog = InitLogger(binDir, "openp2p", LevelDEBUG, 1024*1024, LogFileAndConsole) - // TODO: install sub command, deamon process if len(os.Args) > 1 { switch os.Args[1] { diff --git a/overlay.go b/overlay.go new file mode 100644 index 0000000..f805f41 --- /dev/null +++ b/overlay.go @@ -0,0 +1,150 @@ +package main + +import ( + "bytes" + "encoding/binary" + "errors" + "net" + "time" +) + +var ErrDeadlineExceeded error = &DeadlineExceededError{} + +// DeadlineExceededError is returned for an expired deadline. +type DeadlineExceededError struct{} + +// Implement the net.Error interface. +// The string is "i/o timeout" because that is what was returned +// by earlier Go versions. Changing it may break programs that +// match on error strings. +func (e *DeadlineExceededError) Error() string { return "i/o timeout" } +func (e *DeadlineExceededError) Timeout() bool { return true } +func (e *DeadlineExceededError) Temporary() bool { return true } + +// implement io.Writer +type overlayConn struct { + tunnel *P2PTunnel + connTCP net.Conn + id uint64 + rtid uint64 + running bool + isClient bool + appID uint64 + appKey uint64 + appKeyBytes []byte + // for udp + connUDP *net.UDPConn + remoteAddr net.Addr + udpRelayData chan []byte + lastReadUDPTs time.Time +} + +func (oConn *overlayConn) run() { + gLog.Printf(LevelDEBUG, "%d overlayConn run start", oConn.id) + defer gLog.Printf(LevelDEBUG, "%d overlayConn run end", oConn.id) + oConn.running = true + oConn.lastReadUDPTs = time.Now() + buffer := make([]byte, ReadBuffLen+PaddingSize) + readBuf := buffer[:ReadBuffLen] + encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding + tunnelHead := new(bytes.Buffer) + relayHead := new(bytes.Buffer) + binary.Write(relayHead, binary.LittleEndian, oConn.rtid) + binary.Write(tunnelHead, binary.LittleEndian, oConn.id) + for oConn.running && oConn.tunnel.isRuning() { + buff, dataLen, err := oConn.Read(readBuf) + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Timeout() { + continue + } + // overlay tcp connection normal close, debug log + gLog.Printf(LevelDEBUG, "overlayConn %d read error:%s,close it", oConn.id, err) + break + } + payload := buff[:dataLen] + if oConn.appKey != 0 { + payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, buffer[:dataLen], dataLen) + } + writeBytes := append(tunnelHead.Bytes(), payload...) + if oConn.rtid == 0 { + oConn.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes) + gLog.Printf(LevelDEBUG, "write overlay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes)) + } else { + // write raley data + all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...) + all = append(all, writeBytes...) + oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all) + gLog.Printf(LevelDEBUG, "write relay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes)) + } + } + if oConn.connTCP != nil { + oConn.connTCP.Close() + } + if oConn.connUDP != nil { + oConn.connUDP.Close() + } + oConn.tunnel.overlayConns.Delete(oConn.id) + // notify peer disconnect + if oConn.isClient { + req := OverlayDisconnectReq{ID: oConn.id} + if oConn.rtid == 0 { + oConn.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req) + } else { + // write relay data + msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req) + msgWithHead := append(relayHead.Bytes(), msg...) + oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) + } + } +} + +func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error) { + if oConn.connUDP != nil { + if time.Now().After(oConn.lastReadUDPTs.Add(time.Minute * 5)) { + err = errors.New("udp close") + return + } + if oConn.remoteAddr != nil { // as server + select { + case buff = <-oConn.udpRelayData: + n = len(buff) + oConn.lastReadUDPTs = time.Now() + case <-time.After(time.Second * 10): + err = ErrDeadlineExceeded + } + } else { // as client + oConn.connUDP.SetReadDeadline(time.Now().Add(5 * time.Second)) + n, _, err = oConn.connUDP.ReadFrom(reuseBuff) + if err == nil { + oConn.lastReadUDPTs = time.Now() + } + buff = reuseBuff + } + return + } + oConn.connTCP.SetReadDeadline(time.Now().Add(time.Second * 5)) + n, err = oConn.connTCP.Read(reuseBuff) + buff = reuseBuff + return +} + +// calling by p2pTunnel +func (oConn *overlayConn) Write(buff []byte) (n int, err error) { + // add mutex when multi-thread calling + if oConn.connUDP != nil { + if oConn.remoteAddr == nil { + n, err = oConn.connUDP.Write(buff) + } else { + n, err = oConn.connUDP.WriteTo(buff, oConn.remoteAddr) + } + if err != nil { + oConn.running = false + } + return + } + n, err = oConn.connTCP.Write(buff) + if err != nil { + oConn.running = false + } + return +} diff --git a/overlaytcp.go b/overlaytcp.go deleted file mode 100644 index 142b395..0000000 --- a/overlaytcp.go +++ /dev/null @@ -1,85 +0,0 @@ -package main - -import ( - "bytes" - "encoding/binary" - "net" - "time" -) - -// implement io.Writer -type overlayTCP struct { - tunnel *P2PTunnel - conn net.Conn - id uint64 - rtid uint64 - running bool - isClient bool - appID uint64 - appKey uint64 - appKeyBytes []byte -} - -func (otcp *overlayTCP) run() { - gLog.Printf(LevelDEBUG, "%d overlayTCP run start", otcp.id) - defer gLog.Printf(LevelDEBUG, "%d overlayTCP run end", otcp.id) - otcp.running = true - buffer := make([]byte, ReadBuffLen+PaddingSize) - readBuf := buffer[:ReadBuffLen] - encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding - tunnelHead := new(bytes.Buffer) - relayHead := new(bytes.Buffer) - binary.Write(relayHead, binary.LittleEndian, otcp.rtid) - binary.Write(tunnelHead, binary.LittleEndian, otcp.id) - for otcp.running && otcp.tunnel.isRuning() { - otcp.conn.SetReadDeadline(time.Now().Add(time.Second * 5)) - dataLen, err := otcp.conn.Read(readBuf) - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Timeout() { - continue - } - // overlay tcp connection normal close, debug log - gLog.Printf(LevelDEBUG, "overlayTCP %d read error:%s,close it", otcp.id, err) - break - } else { - payload := readBuf[:dataLen] - if otcp.appKey != 0 { - payload, _ = encryptBytes(otcp.appKeyBytes, encryptData, buffer[:dataLen], dataLen) - } - writeBytes := append(tunnelHead.Bytes(), payload...) - if otcp.rtid == 0 { - otcp.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes) - } else { - // write raley data - all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...) - all = append(all, writeBytes...) - otcp.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all) - gLog.Printf(LevelDEBUG, "write relay data to %d:%d bodylen=%d", otcp.rtid, otcp.id, len(writeBytes)) - } - } - } - otcp.conn.Close() - otcp.tunnel.overlayConns.Delete(otcp.id) - // notify peer disconnect - if otcp.isClient { - req := OverlayDisconnectReq{ID: otcp.id} - if otcp.rtid == 0 { - otcp.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req) - } else { - // write relay data - msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req) - msgWithHead := append(relayHead.Bytes(), msg...) - otcp.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) - } - } -} - -// calling by p2pTunnel -func (otcp *overlayTCP) Write(buff []byte) (n int, err error) { - // add mutex when multi-thread calling - n, err = otcp.conn.Write(buff) - if err != nil { - otcp.tunnel.overlayConns.Delete(otcp.id) - } - return -} diff --git a/p2papp.go b/p2papp.go index 43f6862..da63519 100644 --- a/p2papp.go +++ b/p2papp.go @@ -6,23 +6,26 @@ import ( "fmt" "math/rand" "net" + "strconv" + "strings" "sync" "time" ) type p2pApp struct { - config AppConfig - listener net.Listener - tunnel *P2PTunnel - rtid uint64 - relayNode string - relayMode string - hbTime time.Time - hbMtx sync.Mutex - running bool - id uint64 - key uint64 - wg sync.WaitGroup + config AppConfig + listener net.Listener + listenerUDP *net.UDPConn + tunnel *P2PTunnel + rtid uint64 + relayNode string + relayMode string + hbTime time.Time + hbMtx sync.Mutex + running bool + id uint64 + key uint64 + wg sync.WaitGroup } func (app *p2pApp) isActive() bool { @@ -45,43 +48,42 @@ func (app *p2pApp) updateHeartbeat() { } func (app *p2pApp) listenTCP() error { + gLog.Printf(LevelDEBUG, "tcp accept on port %d start", app.config.SrcPort) + defer gLog.Printf(LevelDEBUG, "tcp accept on port %d end", app.config.SrcPort) var err error - if app.config.Protocol == "udp" { - app.listener, err = net.Listen("udp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort)) - } else { - app.listener, err = net.Listen("tcp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort)) - } - + app.listener, err = net.Listen("tcp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort)) if err != nil { gLog.Printf(LevelERROR, "listen error:%s", err) return err } - for { + for app.running { conn, err := app.listener.Accept() if err != nil { - gLog.Printf(LevelERROR, "%d accept error:%s", app.tunnel.id, err) + if app.running { + gLog.Printf(LevelERROR, "%d accept error:%s", app.tunnel.id, err) + } break } - otcp := overlayTCP{ + oConn := overlayConn{ tunnel: app.tunnel, - conn: conn, + connTCP: conn, id: rand.Uint64(), isClient: true, rtid: app.rtid, appID: app.id, appKey: app.key, } - // calc key bytes for encrypt - if otcp.appKey != 0 { + // pre-calc key bytes for encrypt + if oConn.appKey != 0 { encryptKey := make([]byte, AESKeySize) - binary.LittleEndian.PutUint64(encryptKey, otcp.appKey) - binary.LittleEndian.PutUint64(encryptKey[8:], otcp.appKey) - otcp.appKeyBytes = encryptKey + binary.LittleEndian.PutUint64(encryptKey, oConn.appKey) + binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey) + oConn.appKeyBytes = encryptKey } - app.tunnel.overlayConns.Store(otcp.id, &otcp) - gLog.Printf(LevelDEBUG, "Accept overlayID:%d", otcp.id) + app.tunnel.overlayConns.Store(oConn.id, &oConn) + gLog.Printf(LevelDEBUG, "Accept TCP overlayID:%d", oConn.id) // tell peer connect - req := OverlayConnectReq{ID: otcp.id, + req := OverlayConnectReq{ID: oConn.id, Token: app.tunnel.pn.config.Token, DstIP: app.config.DstHost, DstPort: app.config.DstPort, @@ -98,26 +100,117 @@ func (app *p2pApp) listenTCP() error { msgWithHead := append(relayHead.Bytes(), msg...) app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) } + go oConn.run() + } + return nil +} - go otcp.run() +func (app *p2pApp) listenUDP() error { + gLog.Printf(LevelDEBUG, "udp accept on port %d start", app.config.SrcPort) + defer gLog.Printf(LevelDEBUG, "udp accept on port %d end", app.config.SrcPort) + var err error + app.listenerUDP, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: app.config.SrcPort}) + if err != nil { + gLog.Printf(LevelERROR, "listen error:%s", err) + return err + } + buffer := make([]byte, 64*1024) + udpID := make([]byte, 8) + for { + app.listenerUDP.SetReadDeadline(time.Now().Add(time.Second * 10)) + len, remoteAddr, err := app.listenerUDP.ReadFrom(buffer) + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Timeout() { + continue + } else { + gLog.Printf(LevelERROR, "udp read failed:%s", err) + break + } + } else { + b := bytes.Buffer{} + b.Write(buffer[:len]) + // load from app.tunnel.overlayConns by remoteAddr error, new udp connection + remoteIP := strings.Split(remoteAddr.String(), ":")[0] + port, _ := strconv.Atoi(strings.Split(remoteAddr.String(), ":")[1]) + a := net.ParseIP(remoteIP) + udpID[0] = a[0] + udpID[1] = a[1] + udpID[2] = a[2] + udpID[3] = a[3] + udpID[4] = byte(port) + udpID[5] = byte(port >> 8) + id := binary.LittleEndian.Uint64(udpID) + s, ok := app.tunnel.overlayConns.Load(id) + if !ok { + oConn := overlayConn{ + tunnel: app.tunnel, + connUDP: app.listenerUDP, + remoteAddr: remoteAddr, + udpRelayData: make(chan []byte, 1000), + id: id, + isClient: true, + rtid: app.rtid, + appID: app.id, + appKey: app.key, + } + // calc key bytes for encrypt + if oConn.appKey != 0 { + encryptKey := make([]byte, AESKeySize) + binary.LittleEndian.PutUint64(encryptKey, oConn.appKey) + binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey) + oConn.appKeyBytes = encryptKey + } + app.tunnel.overlayConns.Store(oConn.id, &oConn) + gLog.Printf(LevelDEBUG, "Accept UDP overlayID:%d", oConn.id) + // tell peer connect + req := OverlayConnectReq{ID: oConn.id, + Token: app.tunnel.pn.config.Token, + DstIP: app.config.DstHost, + DstPort: app.config.DstPort, + Protocol: app.config.Protocol, + AppID: app.id, + } + if app.rtid == 0 { + app.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayConnectReq, &req) + } else { + req.RelayTunnelID = app.tunnel.id + relayHead := new(bytes.Buffer) + binary.Write(relayHead, binary.LittleEndian, app.rtid) + msg, _ := newMessage(MsgP2P, MsgOverlayConnectReq, &req) + msgWithHead := append(relayHead.Bytes(), msg...) + app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) + } + go oConn.run() + oConn.udpRelayData <- b.Bytes() + } + + // load from app.tunnel.overlayConns by remoteAddr ok, write relay data + overlayConn, ok := s.(*overlayConn) + if !ok { + continue + } + overlayConn.udpRelayData <- b.Bytes() + } } return nil } func (app *p2pApp) listen() error { - gLog.Printf(LevelINFO, "LISTEN ON PORT %d START", app.config.SrcPort) - defer gLog.Printf(LevelINFO, "LISTEN ON PORT %d START", app.config.SrcPort) + gLog.Printf(LevelINFO, "LISTEN ON PORT %s:%d START", app.config.Protocol, app.config.SrcPort) + defer gLog.Printf(LevelINFO, "LISTEN ON PORT %s:%d END", app.config.Protocol, app.config.SrcPort) app.wg.Add(1) defer app.wg.Done() app.running = true if app.rtid != 0 { go app.relayHeartbeatLoop() } - for app.running { - - app.listenTCP() - - time.Sleep(time.Second * 5) + for app.tunnel.isRuning() && app.running { + if app.config.Protocol == "udp" { + app.listenUDP() + } else { + app.listenTCP() + } + time.Sleep(time.Second * 10) } return nil } @@ -127,6 +220,9 @@ func (app *p2pApp) close() { if app.listener != nil { app.listener.Close() } + if app.listenerUDP != nil { + app.listenerUDP.Close() + } if app.tunnel != nil { app.tunnel.closeOverlayConns(app.id) } diff --git a/p2pnetwork.go b/p2pnetwork.go index 1602cd8..dbb8b00 100644 --- a/p2pnetwork.go +++ b/p2pnetwork.go @@ -107,23 +107,21 @@ func (pn *P2PNetwork) runAll() { config.AppName = fmt.Sprintf("%s%d", config.Protocol, config.SrcPort) } appExist := false - appActive := false + var appID uint64 i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) if ok { app := i.(*p2pApp) appExist = true + appID = app.id if app.isActive() { - appActive = true + continue } } - if appExist && appActive { - continue - } - if appExist && !appActive { + if appExist { pn.DeleteApp(*config) } if config.retryNum > 0 { - gLog.Printf(LevelINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum) + gLog.Printf(LevelINFO, "detect app %s(%d) disconnect, reconnecting the %d times...", config.AppName, appID, config.retryNum) if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // normal lasts 15min config.retryNum = 0 } @@ -135,12 +133,11 @@ func (pn *P2PNetwork) runAll() { increase = 900 } config.nextRetryTime = time.Now().Add(time.Second * time.Duration(increase)) // exponential increase retry time. 1.3^x - go pn.AddApp(*config) + pn.AddApp(*config) } } func (pn *P2PNetwork) autorunApp() { gLog.Println(LevelINFO, "autorunApp start") - // TODO: use gConf to check reconnect for pn.running { time.Sleep(time.Second) if !pn.online { diff --git a/p2ptunnel.go b/p2ptunnel.go index ec2dd15..6b648f8 100644 --- a/p2ptunnel.go +++ b/p2ptunnel.go @@ -276,7 +276,7 @@ func (t *P2PTunnel) readLoop() { gLog.Printf(LevelDEBUG, "%d tunnel not found overlay connection %d", t.id, overlayID) continue } - overlayConn, ok := s.(*overlayTCP) + overlayConn, ok := s.(*overlayConn) if !ok { continue } @@ -333,35 +333,34 @@ func (t *P2PTunnel) readLoop() { overlayID := req.ID gLog.Printf(LevelDEBUG, "App:%d overlayID:%d connect %+v", req.AppID, overlayID, req) - var conn net.Conn - if req.Protocol == "udp" { - conn, err = net.DialTimeout("udp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5) - } else { - conn, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5) - } - if err != nil { - gLog.Println(LevelERROR, err) - continue - } - otcp := overlayTCP{ + oConn := overlayConn{ tunnel: t, - conn: conn, id: overlayID, isClient: false, rtid: req.RelayTunnelID, appID: req.AppID, appKey: GetKey(req.AppID), } - // calc key bytes for encrypt - if otcp.appKey != 0 { - encryptKey := make([]byte, 16) - binary.LittleEndian.PutUint64(encryptKey, otcp.appKey) - binary.LittleEndian.PutUint64(encryptKey[8:], otcp.appKey) - otcp.appKeyBytes = encryptKey + if req.Protocol == "udp" { + oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort}) + } else { + oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5) + } + if err != nil { + gLog.Println(LevelERROR, err) + continue } - t.overlayConns.Store(otcp.id, &otcp) - go otcp.run() + // calc key bytes for encrypt + if oConn.appKey != 0 { + encryptKey := make([]byte, 16) + binary.LittleEndian.PutUint64(encryptKey, oConn.appKey) + binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey) + oConn.appKeyBytes = encryptKey + } + + t.overlayConns.Store(oConn.id, &oConn) + go oConn.run() case MsgOverlayDisconnectReq: req := OverlayDisconnectReq{} err := json.Unmarshal(body, &req) @@ -373,8 +372,8 @@ func (t *P2PTunnel) readLoop() { gLog.Printf(LevelDEBUG, "%d disconnect overlay connection %d", t.id, overlayID) i, ok := t.overlayConns.Load(overlayID) if ok { - otcp := i.(*overlayTCP) - otcp.running = false + oConn := i.(*overlayConn) + oConn.running = false } default: } @@ -411,9 +410,16 @@ func (t *P2PTunnel) listen() error { func (t *P2PTunnel) closeOverlayConns(appID uint64) { t.overlayConns.Range(func(_, i interface{}) bool { - otcp := i.(*overlayTCP) - if otcp.appID == appID { - otcp.conn.Close() + oConn := i.(*overlayConn) + if oConn.appID == appID { + if oConn.connTCP != nil { + oConn.connTCP.Close() + oConn.connTCP = nil + } + if oConn.connUDP != nil { + oConn.connUDP.Close() + oConn.connUDP = nil + } } return true }) diff --git a/protocol.go b/protocol.go index 3180f82..9c627b2 100644 --- a/protocol.go +++ b/protocol.go @@ -10,7 +10,7 @@ import ( "time" ) -const OpenP2PVersion = "1.2.0" +const OpenP2PVersion = "1.3.0" const ProducnName string = "openp2p" type openP2PHeader struct { @@ -117,7 +117,7 @@ const ( ) const ( - ReadBuffLen = 1024 + ReadBuffLen = 4096 // for UDP maybe not enough NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow TunnelHeartbeatTime = time.Second * 15 TunnelIdleTimeout = time.Minute