optimize tcp and udp punch

This commit is contained in:
TenderIronh
2023-07-29 20:36:35 +08:00
parent 8ebdf3341e
commit 46b4f78010
8 changed files with 339 additions and 332 deletions

View File

@@ -137,8 +137,7 @@ func netInfo() *NetInfo {
continue
}
rsp := NetInfo{}
err = json.Unmarshal(buf[:n], &rsp)
if err != nil {
if err = json.Unmarshal(buf[:n], &rsp); err != nil {
gLog.Printf(LvERROR, "wrong NetInfo:%s", err)
continue
}

View File

@@ -3,6 +3,7 @@ package openp2p
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"sync"
@@ -41,6 +42,10 @@ type AppConfig struct {
isUnderlayServer int // TODO: bool?
}
func (c *AppConfig) ID() string {
return fmt.Sprintf("%s%d", c.Protocol, c.SrcPort)
}
// TODO: add loglevel, maxlogfilesize
type Config struct {
Network NetworkConfig `json:"network"`

View File

@@ -8,6 +8,7 @@ import (
"os"
"os/exec"
"path/filepath"
"reflect"
"time"
"github.com/openp2p-cn/totp"
@@ -22,62 +23,10 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead)
switch subType {
case MsgPushConnectReq: // TODO: handle a msg move to a new function
req := PushConnectReq{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushConnectReq:%s", err)
return err
}
gLog.Printf(LvDEBUG, "%s is connecting...", req.From)
gLog.Println(LvDEBUG, "push connect response to ", req.From)
if compareVersion(req.Version, LeastSupportVersion) == LESS {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
rsp := PushConnectRsp{
Error: 10,
Detail: ErrVersionNotCompatible.Error(),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
return ErrVersionNotCompatible
}
// verify totp token or token
t := totp.TOTP{Step: totp.RelayTOTPStep}
if t.Verify(req.Token, pn.config.Token, time.Now().Unix()-pn.dt) { // localTs may behind, auto adjust ts
gLog.Printf(LvINFO, "Access Granted\n")
config := AppConfig{}
config.peerNatType = req.NatType
config.peerConeNatPort = req.ConeNatPort
config.peerIP = req.FromIP
config.PeerNode = req.From
config.peerVersion = req.Version
config.fromToken = req.Token
config.peerIPv6 = req.IPv6
config.hasIPv4 = req.HasIPv4
config.hasUPNPorNATPMP = req.HasUPNPorNATPMP
config.linkMode = req.LinkMode
config.isUnderlayServer = req.IsUnderlayServer
// share relay node will limit bandwidth
if req.Token != pn.config.Token {
gLog.Printf(LvINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth)
config.shareBandwidth = pn.config.ShareBandwidth
}
// go pn.AddTunnel(config, req.ID)
go pn.addDirectTunnel(config, req.ID)
break
}
gLog.Println(LvERROR, "Access Denied:", req.From)
rsp := PushConnectRsp{
Error: 1,
Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
err = handleConnectReq(pn, subType, msg)
case MsgPushRsp:
rsp := PushRsp{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &rsp)
if err != nil {
if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil {
gLog.Printf(LvERROR, "wrong pushRsp:%s", err)
return err
}
@@ -88,9 +37,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}
case MsgPushAddRelayTunnelReq:
req := AddRelayTunnelReq{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
return err
}
config := AppConfig{}
@@ -106,9 +54,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}(req)
case MsgPushAPPKey:
req := APPKeySync{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong APPKeySync:%s", err)
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
return err
}
SaveKey(req.AppID, req.AppKey)
@@ -133,123 +80,16 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
os.Exit(0)
return err
case MsgPushReportApps:
gLog.Println(LvINFO, "MsgPushReportApps")
req := ReportApps{}
gConf.mtx.Lock()
defer gConf.mtx.Unlock()
for _, config := range gConf.Apps {
appActive := 0
relayNode := ""
relayMode := ""
linkMode := LinkModeUDPPunch
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
if ok {
app := i.(*p2pApp)
if app.isActive() {
appActive = 1
}
relayNode = app.relayNode
relayMode = app.relayMode
linkMode = app.tunnel.linkModeWeb
}
appInfo := AppInfo{
AppName: config.AppName,
Error: config.errMsg,
Protocol: config.Protocol,
SrcPort: config.SrcPort,
RelayNode: relayNode,
RelayMode: relayMode,
LinkMode: linkMode,
PeerNode: config.PeerNode,
DstHost: config.DstHost,
DstPort: config.DstPort,
PeerUser: config.PeerUser,
PeerIP: config.peerIP,
PeerNatType: config.peerNatType,
RetryTime: config.retryTime.Local().Format("2006-01-02T15:04:05-0700"),
ConnectTime: config.connectTime.Local().Format("2006-01-02T15:04:05-0700"),
IsActive: appActive,
Enabled: config.Enabled,
}
req.Apps = append(req.Apps, appInfo)
}
pn.write(MsgReport, MsgReportApps, &req)
err = handleReportApps(pn, subType, msg)
case MsgPushReportLog:
gLog.Println(LvDEBUG, "MsgPushReportLog")
req := ReportLogReq{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushReportLog:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
if req.FileName == "" {
req.FileName = "openp2p.log"
}
f, err := os.Open(filepath.Join("log", req.FileName))
if err != nil {
gLog.Println(LvERROR, "read log file error:", err)
break
}
fi, err := f.Stat()
if err != nil {
break
}
if req.Offset == 0 && fi.Size() > 4096 {
req.Offset = fi.Size() - 4096
}
if req.Len <= 0 {
req.Len = 4096
}
f.Seek(req.Offset, 0)
if req.Len > 1024*1024 { // too large
break
}
buff := make([]byte, req.Len)
readLength, err := f.Read(buff)
f.Close()
if err != nil {
gLog.Println(LvERROR, "read log content error:", err)
break
}
rsp := ReportLogRsp{}
rsp.Content = string(buff[:readLength])
rsp.FileName = req.FileName
rsp.Total = fi.Size()
rsp.Len = req.Len
pn.write(MsgReport, MsgPushReportLog, &rsp)
err = handleLog(pn, subType, msg)
case MsgPushEditApp:
gLog.Println(LvINFO, "MsgPushEditApp")
newApp := AppInfo{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &newApp)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushEditApp:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
oldConf := AppConfig{Enabled: 1}
// protocol0+srcPort0 exist, delApp
oldConf.AppName = newApp.AppName
oldConf.Protocol = newApp.Protocol0
oldConf.SrcPort = newApp.SrcPort0
oldConf.PeerNode = newApp.PeerNode
oldConf.DstHost = newApp.DstHost
oldConf.DstPort = newApp.DstPort
gConf.delete(oldConf)
// AddApp
newConf := oldConf
newConf.Protocol = newApp.Protocol
newConf.SrcPort = newApp.SrcPort
gConf.add(newConf, false)
pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
// autoReconnect will auto AddApp
// pn.AddApp(config)
// TODO: report result
err = handleEditApp(pn, subType, msg)
case MsgPushEditNode:
gLog.Println(LvINFO, "MsgPushEditNode")
req := EditNode{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushEditNode:%s %s", err, string(msg[openP2PHeaderSize:]))
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
gConf.setNode(req.NewName)
@@ -259,9 +99,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
case MsgPushSwitchApp:
gLog.Println(LvINFO, "MsgPushSwitchApp")
app := AppInfo{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &app)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushSwitchApp:%s %s", err, string(msg[openP2PHeaderSize:]))
if err = json.Unmarshal(msg[openP2PHeaderSize:], &app); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(app), err, string(msg[openP2PHeaderSize:]))
return err
}
config := AppConfig{Enabled: app.Enabled, SrcPort: app.SrcPort, Protocol: app.Protocol}
@@ -273,19 +112,193 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}
case MsgPushDstNodeOnline:
gLog.Println(LvINFO, "MsgPushDstNodeOnline")
app := PushDstNodeOnline{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &app)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushDstNodeOnline:%s %s", err, string(msg[openP2PHeaderSize:]))
req := PushDstNodeOnline{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
gLog.Println(LvINFO, "retry peerNode ", app.Node)
gConf.retryApp(app.Node)
gLog.Println(LvINFO, "retry peerNode ", req.Node)
gConf.retryApp(req.Node)
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[pushHead.From]
pn.msgMapMtx.Unlock()
ch <- msg
}
return nil
return err
}
func handleEditApp(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvINFO, "MsgPushEditApp")
newApp := AppInfo{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &newApp); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(newApp), err, string(msg[openP2PHeaderSize:]))
return err
}
oldConf := AppConfig{Enabled: 1}
// protocol0+srcPort0 exist, delApp
oldConf.AppName = newApp.AppName
oldConf.Protocol = newApp.Protocol0
oldConf.SrcPort = newApp.SrcPort0
oldConf.PeerNode = newApp.PeerNode
oldConf.DstHost = newApp.DstHost
oldConf.DstPort = newApp.DstPort
gConf.delete(oldConf)
// AddApp
newConf := oldConf
newConf.Protocol = newApp.Protocol
newConf.SrcPort = newApp.SrcPort
gConf.add(newConf, false)
pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
return nil
// autoReconnect will auto AddApp
// pn.AddApp(config)
// TODO: report result
}
func handleConnectReq(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
req := PushConnectReq{}
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
return err
}
gLog.Printf(LvDEBUG, "%s is connecting...", req.From)
gLog.Println(LvDEBUG, "push connect response to ", req.From)
if compareVersion(req.Version, LeastSupportVersion) == LESS {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
rsp := PushConnectRsp{
Error: 10,
Detail: ErrVersionNotCompatible.Error(),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
return ErrVersionNotCompatible
}
// verify totp token or token
t := totp.TOTP{Step: totp.RelayTOTPStep}
if t.Verify(req.Token, pn.config.Token, time.Now().Unix()-pn.dt/int64(time.Second)) { // localTs may behind, auto adjust ts
gLog.Printf(LvINFO, "Access Granted\n")
config := AppConfig{}
config.peerNatType = req.NatType
config.peerConeNatPort = req.ConeNatPort
config.peerIP = req.FromIP
config.PeerNode = req.From
config.peerVersion = req.Version
config.fromToken = req.Token
config.peerIPv6 = req.IPv6
config.hasIPv4 = req.HasIPv4
config.hasUPNPorNATPMP = req.HasUPNPorNATPMP
config.linkMode = req.LinkMode
config.isUnderlayServer = req.IsUnderlayServer
// share relay node will limit bandwidth
if req.Token != pn.config.Token {
gLog.Printf(LvINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth)
config.shareBandwidth = pn.config.ShareBandwidth
}
// go pn.AddTunnel(config, req.ID)
go pn.addDirectTunnel(config, req.ID)
return nil
}
gLog.Println(LvERROR, "Access Denied:", req.From)
rsp := PushConnectRsp{
Error: 1,
Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node),
To: req.From,
From: pn.config.Node,
}
return pn.push(req.From, MsgPushConnectRsp, rsp)
}
func handleReportApps(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvINFO, "MsgPushReportApps")
req := ReportApps{}
gConf.mtx.Lock()
defer gConf.mtx.Unlock()
for _, config := range gConf.Apps {
appActive := 0
relayNode := ""
relayMode := ""
linkMode := LinkModeUDPPunch
i, ok := pn.apps.Load(config.ID())
if ok {
app := i.(*p2pApp)
if app.isActive() {
appActive = 1
}
relayNode = app.relayNode
relayMode = app.relayMode
linkMode = app.tunnel.linkModeWeb
}
appInfo := AppInfo{
AppName: config.AppName,
Error: config.errMsg,
Protocol: config.Protocol,
SrcPort: config.SrcPort,
RelayNode: relayNode,
RelayMode: relayMode,
LinkMode: linkMode,
PeerNode: config.PeerNode,
DstHost: config.DstHost,
DstPort: config.DstPort,
PeerUser: config.PeerUser,
PeerIP: config.peerIP,
PeerNatType: config.peerNatType,
RetryTime: config.retryTime.Local().Format("2006-01-02T15:04:05-0700"),
ConnectTime: config.connectTime.Local().Format("2006-01-02T15:04:05-0700"),
IsActive: appActive,
Enabled: config.Enabled,
}
req.Apps = append(req.Apps, appInfo)
}
return pn.write(MsgReport, MsgReportApps, &req)
}
func handleLog(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvDEBUG, "MsgPushReportLog")
const defaultLen = 1024 * 128
const maxLen = 1024 * 1024
req := ReportLogReq{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
if req.FileName == "" {
req.FileName = "openp2p.log"
}
f, err := os.Open(filepath.Join("log", req.FileName))
if err != nil {
gLog.Println(LvERROR, "read log file error:", err)
return err
}
fi, err := f.Stat()
if err != nil {
return err
}
if req.Offset > fi.Size() {
req.Offset = fi.Size() - defaultLen
}
// verify input parameters
if req.Offset < 0 {
req.Offset = 0
}
if req.Len <= 0 || req.Len > maxLen {
req.Len = defaultLen
}
f.Seek(req.Offset, 0)
buff := make([]byte, req.Len)
readLength, err := f.Read(buff)
f.Close()
if err != nil {
gLog.Println(LvERROR, "read log content error:", err)
return err
}
rsp := ReportLogRsp{}
rsp.Content = string(buff[:readLength])
rsp.FileName = req.FileName
rsp.Total = fi.Size()
rsp.Len = req.Len
return pn.write(MsgReport, MsgPushReportLog, &rsp)
}

View File

@@ -84,7 +84,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string,
return "", 0, err
}
natRsp := NatDetectRsp{}
err = json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp)
json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp)
return natRsp.IP, natRsp.Port, nil
}
@@ -121,6 +121,7 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP
echoConn, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: echoPort})
if err != nil {
gLog.Println(LvERROR, "echo server listen error:", err)
wg.Done()
return
}
buf := make([]byte, 1600)
@@ -135,7 +136,10 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP
echoConn.WriteToUDP(buf[0:n], addr)
gLog.Println(LvDEBUG, "echo server end")
}()
wg.Wait() // wait echo udp
wg.Wait() // wait echo udp
if echoConn == nil { // listen error
return
}
defer echoConn.Close()
// testing for public ip
for i := 0; i < 2; i++ {

View File

@@ -10,6 +10,7 @@ import (
"math/rand"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"time"
@@ -112,30 +113,18 @@ func (pn *P2PNetwork) runAll() {
allApps := gConf.Apps // read a copy, other thread will modify the gConf.Apps
for _, config := range allApps {
if config.nextRetryTime.After(time.Now()) {
continue
}
if config.Enabled == 0 {
if config.nextRetryTime.After(time.Now()) || config.Enabled == 0 || config.retryNum >= retryLimit {
continue
}
if config.AppName == "" {
config.AppName = fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)
config.AppName = config.ID()
}
appExist := false
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
if ok {
app := i.(*p2pApp)
appExist = true
if app.isActive() {
if i, ok := pn.apps.Load(config.ID()); ok {
if app := i.(*p2pApp); app.isActive() {
continue
}
}
if appExist {
pn.DeleteApp(*config)
}
if config.retryNum >= retryLimit {
continue
}
if config.retryNum > 0 { // first time not show reconnect log
gLog.Printf(LvINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum)
@@ -160,6 +149,7 @@ func (pn *P2PNetwork) runAll() {
}
}
}
func (pn *P2PNetwork) autorunApp() {
gLog.Println(LvINFO, "autorunApp start")
pn.wgReconnect.Add(1)
@@ -181,9 +171,7 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
return nil, 0, "", errors.New("read MsgRelayNodeRsp error")
}
rsp := RelayNodeRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
if err := json.Unmarshal(body, &rsp); err != nil {
return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error")
}
if rsp.RelayName == "" || rsp.RelayToken == 0 {
@@ -216,9 +204,7 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
return nil, 0, "", errors.New("read MsgPushAddRelayTunnelRsp error")
}
rspID := TunnelMsg{}
err = json.Unmarshal(body, &rspID)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
if err = json.Unmarshal(body, &rspID); err != nil {
return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error")
}
return t, rspID.ID, rsp.Mode, err
@@ -233,7 +219,7 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
}
// check if app already exist?
appExist := false
_, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
_, ok := pn.apps.Load(config.ID())
if ok {
appExist = true
}
@@ -303,7 +289,8 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
relayNode: relayNode,
relayMode: relayMode,
hbTime: time.Now()}
pn.apps.Store(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort), &app)
pn.apps.Store(config.ID(), &app)
gLog.Printf(LvINFO, "%s use tunnel %d", app.config.AppName, app.tunnel.id)
if err == nil {
go app.listen()
}
@@ -314,16 +301,37 @@ func (pn *P2PNetwork) DeleteApp(config AppConfig) {
gLog.Printf(LvINFO, "DeleteApp %s%d start", config.Protocol, config.SrcPort)
defer gLog.Printf(LvINFO, "DeleteApp %s%d end", config.Protocol, config.SrcPort)
// close the apps of this config
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
i, ok := pn.apps.Load(config.ID())
if ok {
app := i.(*p2pApp)
gLog.Printf(LvINFO, "app %s exist, delete it", fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
gLog.Printf(LvINFO, "app %s exist, delete it", app.config.AppName)
app.close()
pn.apps.Delete(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
pn.apps.Delete(config.ID())
}
}
func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, error) {
func (pn *P2PNetwork) findTunnel(config *AppConfig) (t *P2PTunnel) {
// find existing tunnel to peer
pn.allTunnels.Range(func(id, i interface{}) bool {
tmpt := i.(*P2PTunnel)
if tmpt.config.PeerNode == config.PeerNode {
gLog.Println(LvINFO, "tunnel already exist ", config.PeerNode)
isActive := tmpt.checkActive()
// inactive, close it
if !isActive {
gLog.Println(LvINFO, "but it's not active, close it ", config.PeerNode)
tmpt.close()
} else {
t = tmpt
}
return false
}
return true
})
return t
}
func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunnel, err error) {
gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
isClient := false
@@ -332,64 +340,34 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
tid = rand.Uint64()
isClient = true
}
exist := false
// find existing tunnel to peer
var t *P2PTunnel
pn.allTunnels.Range(func(id, i interface{}) bool {
t = i.(*P2PTunnel)
if t.config.PeerNode == config.PeerNode {
// server side force close existing tunnel
if !isClient {
t.close()
return false
}
// client side checking
gLog.Println(LvINFO, "tunnel already exist ", config.PeerNode)
isActive := t.checkActive()
// inactive, close it
if !isActive {
gLog.Println(LvINFO, "but it's not active, close it ", config.PeerNode)
t.close()
} else {
// active
exist = true
}
return false
}
return true
})
if exist {
if t = pn.findTunnel(&config); t != nil {
return t, nil
}
// create tunnel if not exist
t = &P2PTunnel{pn: pn,
config: config,
id: tid,
}
pn.msgMapMtx.Lock()
pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan []byte, 50)
pn.msgMapMtx.Unlock()
// server side
if !isClient {
err := pn.newTunnel(t, tid, isClient)
t, err = pn.newTunnel(config, tid, isClient)
return t, err // always return
}
// client side
// peer info
initErr := t.requestPeerInfo()
initErr := pn.requestPeerInfo(&config)
if initErr != nil {
gLog.Println(LvERROR, "init error:", initErr)
return nil, initErr
}
var err error
// try TCP6
if IsIPv6(t.config.peerIPv6) && IsIPv6(t.pn.config.publicIPv6) {
if IsIPv6(config.peerIPv6) && IsIPv6(pn.config.publicIPv6) {
gLog.Println(LvINFO, "try TCP6")
t.config.linkMode = LinkModeTCP6
t.config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil {
config.linkMode = LinkModeTCP6
config.isUnderlayServer = 0
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil
}
}
@@ -397,59 +375,74 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
// TODO: try UDP6
// try TCP4
if t.config.hasIPv4 == 1 || t.pn.config.hasIPv4 == 1 || t.config.hasUPNPorNATPMP == 1 || t.pn.config.hasUPNPorNATPMP == 1 {
if config.hasIPv4 == 1 || pn.config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 || pn.config.hasUPNPorNATPMP == 1 {
gLog.Println(LvINFO, "try TCP4")
t.config.linkMode = LinkModeTCP4
if t.config.hasIPv4 == 1 || t.config.hasUPNPorNATPMP == 1 {
t.config.isUnderlayServer = 0
config.linkMode = LinkModeTCP4
if config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 {
config.isUnderlayServer = 0
} else {
t.config.isUnderlayServer = 1
config.isUnderlayServer = 1
}
if err = pn.newTunnel(t, tid, isClient); err == nil {
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil
}
}
// TODO: try UDP4
// try TCPPunch
if t.config.peerNatType == NATCone && t.pn.config.natType == NATCone { // TODO: support c2s
gLog.Println(LvINFO, "try TCP4 Punch")
t.config.linkMode = LinkModeTCPPunch
t.config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil {
gLog.Println(LvINFO, "TCP4 Punch ok")
return t, nil
for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries
if config.peerNatType == NATCone && pn.config.natType == NATCone { // TODO: support c2s
gLog.Println(LvINFO, "try TCP4 Punch")
config.linkMode = LinkModeTCPPunch
config.isUnderlayServer = 0
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
gLog.Println(LvINFO, "TCP4 Punch ok")
return t, nil
}
}
}
// try UDPPunch
if t.config.peerNatType == NATCone || t.pn.config.natType == NATCone {
gLog.Println(LvINFO, "try UDP4 Punch")
t.config.linkMode = LinkModeUDPPunch
t.config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil {
return t, nil
for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries
if config.peerNatType == NATCone || pn.config.natType == NATCone {
gLog.Println(LvINFO, "try UDP4 Punch")
config.linkMode = LinkModeUDPPunch
config.isUnderlayServer = 0
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil
}
}
if !(config.peerNatType == NATCone && pn.config.natType == NATCone) { // not cone2cone, no more try
break
}
}
return nil, ErrorHandshake // only ErrorHandshake will try relay
}
func (pn *P2PNetwork) newTunnel(t *P2PTunnel, tid uint64, isClient bool) error {
func (pn *P2PNetwork) newTunnel(config AppConfig, tid uint64, isClient bool) (t *P2PTunnel, err error) {
if existTunnel := pn.findTunnel(&config); existTunnel != nil {
return existTunnel, nil
}
t = &P2PTunnel{pn: pn,
config: config,
id: tid,
}
t.initPort()
if isClient {
if err := t.connect(); err != nil {
if err = t.connect(); err != nil {
gLog.Println(LvERROR, "p2pTunnel connect error:", err)
return err
return
}
} else {
if err := t.listen(); err != nil {
if err = t.listen(); err != nil {
gLog.Println(LvERROR, "p2pTunnel listen error:", err)
return err
return
}
}
// store it when success
gLog.Printf(LvDEBUG, "store tunnel %d", tid)
pn.allTunnels.Store(tid, t)
return nil
return
}
func (pn *P2PNetwork) init() error {
gLog.Println(LvINFO, "init start")
@@ -552,9 +545,8 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
case MsgLogin:
// gLog.Println(LevelINFO,string(msg))
rsp := LoginRsp{}
err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong login response:%s", err)
if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(rsp), err)
return
}
if rsp.Error != 0 {
@@ -746,3 +738,30 @@ func (pn *P2PNetwork) refreshIPv6(force bool) {
}
pn.config.publicIPv6 = string(buf[:n])
}
func (pn *P2PNetwork) requestPeerInfo(config *AppConfig) error {
// request peer info
pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{config.peerToken, config.PeerNode})
head, body := pn.read("", MsgQuery, MsgQueryPeerInfoRsp, UnderlayConnectTimeout)
if head == nil {
return ErrNetwork // network error, should not be ErrPeerOffline
}
rsp := QueryPeerInfoRsp{}
if err := json.Unmarshal(body, &rsp); err != nil {
return ErrMsgFormat
}
if rsp.Online == 0 {
return ErrPeerOffline
}
if compareVersion(rsp.Version, LeastSupportVersion) == LESS {
return ErrVersionNotCompatible
}
config.peerVersion = rsp.Version
config.hasIPv4 = rsp.HasIPv4
config.peerIP = rsp.IPv4
config.peerIPv6 = rsp.IPv6
config.hasUPNPorNATPMP = rsp.HasUPNPorNATPMP
config.peerNatType = rsp.NatType
///
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"math/rand"
"net"
"reflect"
"sync"
"time"
)
@@ -32,34 +33,6 @@ type P2PTunnel struct {
punchTs uint64
}
func (t *P2PTunnel) requestPeerInfo() error {
// request peer info
t.pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{t.config.peerToken, t.config.PeerNode})
head, body := t.pn.read("", MsgQuery, MsgQueryPeerInfoRsp, UnderlayConnectTimeout)
if head == nil {
return ErrNetwork // network error, should not be ErrPeerOffline
}
rsp := QueryPeerInfoRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong QueryPeerInfoRsp:%s", err)
return ErrMsgFormat
}
if rsp.Online == 0 {
return ErrPeerOffline
}
if compareVersion(rsp.Version, LeastSupportVersion) == LESS {
return ErrVersionNotCompatible
}
t.config.peerVersion = rsp.Version
t.config.hasIPv4 = rsp.HasIPv4
t.config.peerIP = rsp.IPv4
t.config.peerIPv6 = rsp.IPv6
t.config.hasUPNPorNATPMP = rsp.HasUPNPorNATPMP
t.config.peerNatType = rsp.NatType
///
return nil
}
func (t *P2PTunnel) initPort() {
t.running = true
t.hbMtx.Lock()
@@ -118,9 +91,8 @@ func (t *P2PTunnel) connect() error {
return errors.New("connect error")
}
rsp := PushConnectRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushConnectRsp:%s", err)
if err := json.Unmarshal(body, &rsp); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(rsp), err)
return err
}
// gLog.Println(LevelINFO, rsp)
@@ -135,7 +107,7 @@ func (t *P2PTunnel) connect() error {
t.config.peerConeNatPort = rsp.ConeNatPort
t.config.peerIP = rsp.FromIP
t.punchTs = rsp.PunchTs
err = t.start()
err := t.start()
if err != nil {
gLog.Println(LvERROR, "handshake error:", err)
err = ErrorHandshake
@@ -165,14 +137,10 @@ func (t *P2PTunnel) isActive() bool {
}
func (t *P2PTunnel) checkActive() bool {
hbt := time.Now()
t.hbMtx.Lock()
if t.hbTime.Before(time.Now().Add(-TunnelHeartbeatTime)) {
t.hbMtx.Unlock()
if t.conn == nil {
return false
}
t.hbMtx.Unlock()
// hbtime within TunnelHeartbeatTime, check it now
hbt := time.Now()
t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeat, nil)
isActive := false
// wait at most 5s
@@ -184,6 +152,7 @@ func (t *P2PTunnel) checkActive() bool {
t.hbMtx.Unlock()
time.Sleep(time.Millisecond * 100)
}
gLog.Printf(LvINFO, "checkActive %t. hbtime=%d", isActive, t.hbTime)
return isActive
}
@@ -451,6 +420,7 @@ func (t *P2PTunnel) readLoop() {
}
switch head.SubType {
case MsgTunnelHeartbeat:
t.hbTime = time.Now()
t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeatAck, nil)
gLog.Printf(LvDEBUG, "%d read tunnel heartbeat", t.id)
case MsgTunnelHeartbeatAck:
@@ -492,9 +462,8 @@ func (t *P2PTunnel) readLoop() {
t.pn.relay(tunnelID, body[8:])
case MsgRelayHeartbeat:
req := RelayHeartbeat{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayHeartbeat:%s", err)
if err := json.Unmarshal(body, &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
continue
}
gLog.Printf(LvDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID)
@@ -514,9 +483,8 @@ func (t *P2PTunnel) readLoop() {
t.pn.updateAppHeartbeat(req.AppID)
case MsgOverlayConnectReq:
req := OverlayConnectReq{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgOverlayConnectReq:%s", err)
if err := json.Unmarshal(body, &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
continue
}
// app connect only accept token(not relay totp token), avoid someone using the share relay node's token
@@ -559,9 +527,8 @@ func (t *P2PTunnel) readLoop() {
go oConn.run()
case MsgOverlayDisconnectReq:
req := OverlayDisconnectReq{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LvERROR, "wrong OverlayDisconnectRequest:%s", err)
if err := json.Unmarshal(body, &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
continue
}
overlayID := req.ID

View File

@@ -10,7 +10,7 @@ import (
"time"
)
const OpenP2PVersion = "3.9.1"
const OpenP2PVersion = "3.9.11"
const ProductName string = "openp2p"
const LeastSupportVersion = "3.0.0"
const SyncServerTimeVersion = "3.9.0"
@@ -135,17 +135,18 @@ const (
const (
ReadBuffLen = 4096 // for UDP maybe not enough
NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow
TunnelHeartbeatTime = time.Second * 15
TunnelHeartbeatTime = time.Second * 10 // some nat udp session expired time less than 15s. change to 10s
TunnelIdleTimeout = time.Minute
SymmetricHandshakeNum = 800 // 0.992379
// SymmetricHandshakeNum = 1000 // 0.999510
SymmetricHandshakeInterval = time.Millisecond
HandshakeTimeout = time.Second * 5
PeerAddRelayTimeount = HandshakeTimeout * 2
PeerAddRelayTimeount = time.Second * 30 // peer need times
CheckActiveTimeout = time.Second * 5
PaddingSize = 16
AESKeySize = 16
MaxRetry = 10
Cone2ConePunchMaxRetry = 3
RetryInterval = time.Second * 30
PublicIPEchoTimeout = time.Second * 1
NatTestTimeout = time.Second * 5

View File

@@ -43,8 +43,7 @@ func update(host string, port int) {
return
}
updateInfo := UpdateInfo{}
err = json.Unmarshal(rspBuf, &updateInfo)
if err != nil {
if err = json.Unmarshal(rspBuf, &updateInfo); err != nil {
gLog.Println(LvERROR, rspBuf, " update info decode error:", err)
return
}