Compare commits

..

7 Commits

Author SHA1 Message Date
TenderIronh
e21adebc26 3.10.3 2023-08-09 22:52:29 +08:00
OpenP2P
b2a7619bd6 Merge pull request #43 from W192547975/fixbug
Bug fixing from 2023/8/1
2023-08-08 16:50:25 +08:00
W192547975
fe4022ba6c Update openp2p.go 2023-08-06 23:38:14 +08:00
W192547975
82c74b4f85 Update config.go 2023-08-06 23:31:17 +08:00
TenderIronh
0af65b7204 tls verify server and support docker 2023-08-03 23:05:45 +08:00
TenderIronh
46b4f78010 optimize tcp and udp punch 2023-07-29 20:36:35 +08:00
TenderIronh
8ebdf3341e 3.9.1 2023-07-21 22:25:33 +08:00
25 changed files with 973 additions and 447 deletions

View File

@@ -97,3 +97,11 @@ C:\Program Files\OpenP2P\openp2p.exe uninstall
# linux,macos # linux,macos
sudo /usr/local/openp2p/openp2p uninstall sudo /usr/local/openp2p/openp2p uninstall
``` ```
## Docker运行
```
# 把YOUR-TOKEN和YOUR-NODE-NAME替换成自己的
docker run -d --net host --name openp2p-client -e OPENP2P_TOKEN=YOUR-TOKEN -e OPENP2P_NODE=YOUR-NODE-NAME openp2pcn/openp2p-client:latest
OR
docker run -d --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME
```

View File

@@ -99,3 +99,11 @@ C:\Program Files\OpenP2P\openp2p.exe uninstall
# linux,macos # linux,macos
sudo /usr/local/openp2p/openp2p uninstall sudo /usr/local/openp2p/openp2p uninstall
``` ```
## Run with Docker
```
# Replace YOUR-TOKEN and YOUR-NODE-NAME with yours
docker run -d --net host --name openp2p-client -e OPENP2P_TOKEN=YOUR-TOKEN -e OPENP2P_NODE=YOUR-NODE-NAME openp2pcn/openp2p-client:latest
OR
docker run -d --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME
```

View File

@@ -1,7 +1,9 @@
package main package main
import openp2p "openp2p/core" import (
core "openp2p/core"
)
func main() { func main() {
openp2p.Run() core.Run()
} }

View File

@@ -137,8 +137,7 @@ func netInfo() *NetInfo {
continue continue
} }
rsp := NetInfo{} rsp := NetInfo{}
err = json.Unmarshal(buf[:n], &rsp) if err = json.Unmarshal(buf[:n], &rsp); err != nil {
if err != nil {
gLog.Printf(LvERROR, "wrong NetInfo:%s", err) gLog.Printf(LvERROR, "wrong NetInfo:%s", err)
continue continue
} }

View File

@@ -3,8 +3,10 @@ package openp2p
import ( import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"strconv"
"sync" "sync"
"time" "time"
) )
@@ -15,6 +17,7 @@ type AppConfig struct {
// required // required
AppName string AppName string
Protocol string Protocol string
Whitelist string
SrcPort int SrcPort int
PeerNode string PeerNode string
DstPort int DstPort int
@@ -41,6 +44,10 @@ type AppConfig struct {
isUnderlayServer int // TODO: bool? isUnderlayServer int // TODO: bool?
} }
func (c *AppConfig) ID() string {
return fmt.Sprintf("%s%d", c.Protocol, c.SrcPort)
}
// TODO: add loglevel, maxlogfilesize // TODO: add loglevel, maxlogfilesize
type Config struct { type Config struct {
Network NetworkConfig `json:"network"` Network NetworkConfig `json:"network"`
@@ -201,6 +208,7 @@ func parseParams(subCommand string) {
node := fset.String("node", "", "node name. 8-31 characters. if not set, it will be hostname") node := fset.String("node", "", "node name. 8-31 characters. if not set, it will be hostname")
peerNode := fset.String("peernode", "", "peer node name that you want to connect") peerNode := fset.String("peernode", "", "peer node name that you want to connect")
dstIP := fset.String("dstip", "127.0.0.1", "destination ip ") dstIP := fset.String("dstip", "127.0.0.1", "destination ip ")
whiteList := fset.String("whitelist", "", "whitelist for p2pApp ")
dstPort := fset.Int("dstport", 0, "destination port ") dstPort := fset.Int("dstport", 0, "destination port ")
srcPort := fset.Int("srcport", 0, "source port ") srcPort := fset.Int("srcport", 0, "source port ")
tcpPort := fset.Int("tcpport", 0, "tcp port for upnp or publicip") tcpPort := fset.Int("tcpport", 0, "tcp port for upnp or publicip")
@@ -210,7 +218,7 @@ func parseParams(subCommand string) {
daemonMode := fset.Bool("d", false, "daemonMode") daemonMode := fset.Bool("d", false, "daemonMode")
notVerbose := fset.Bool("nv", false, "not log console") notVerbose := fset.Bool("nv", false, "not log console")
newconfig := fset.Bool("newconfig", false, "not load existing config.json") newconfig := fset.Bool("newconfig", false, "not load existing config.json")
logLevel := fset.Int("loglevel", 0, "0:info 1:warn 2:error 3:debug") logLevel := fset.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error")
if subCommand == "" { // no subcommand if subCommand == "" { // no subcommand
fset.Parse(os.Args[1:]) fset.Parse(os.Args[1:])
} else { } else {
@@ -220,6 +228,7 @@ func parseParams(subCommand string) {
config := AppConfig{Enabled: 1} config := AppConfig{Enabled: 1}
config.PeerNode = *peerNode config.PeerNode = *peerNode
config.DstHost = *dstIP config.DstHost = *dstIP
config.Whitelist = *whiteList
config.DstPort = *dstPort config.DstPort = *dstPort
config.SrcPort = *srcPort config.SrcPort = *srcPort
config.Protocol = *protocol config.Protocol = *protocol
@@ -253,17 +262,17 @@ func parseParams(subCommand string) {
gConf.setToken(*token) gConf.setToken(*token)
} }
}) })
// set default value
if gConf.Network.ServerHost == "" { if gConf.Network.ServerHost == "" {
gConf.Network.ServerHost = *serverHost gConf.Network.ServerHost = *serverHost
} }
if *node != "" { if *node != "" {
if len(*node) < MinNodeNameLen {
gLog.Println(LvERROR, ErrNodeTooShort)
os.Exit(9)
}
gConf.Network.Node = *node gConf.Network.Node = *node
} else { } else {
envNode := os.Getenv("OPENP2P_NODE")
if envNode != "" {
gConf.Network.Node = envNode
}
if gConf.Network.Node == "" { // if node name not set. use os.Hostname if gConf.Network.Node == "" { // if node name not set. use os.Hostname
gConf.Network.Node = defaultNodeName() gConf.Network.Node = defaultNodeName()
} }
@@ -275,7 +284,14 @@ func parseParams(subCommand string) {
} }
gConf.Network.TCPPort = *tcpPort gConf.Network.TCPPort = *tcpPort
} }
if *token == 0 {
envToken := os.Getenv("OPENP2P_TOKEN")
if envToken != "" {
if n, err := strconv.ParseUint(envToken, 10, 64); n != 0 && err == nil {
gConf.setToken(n)
}
}
}
gConf.Network.ServerPort = *serverPort gConf.Network.ServerPort = *serverPort
gConf.Network.UDPPort1 = UDPPort1 gConf.Network.UDPPort1 = UDPPort1
gConf.Network.UDPPort2 = UDPPort2 gConf.Network.UDPPort2 = UDPPort2

View File

@@ -8,6 +8,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"reflect"
"time" "time"
"github.com/openp2p-cn/totp" "github.com/openp2p-cn/totp"
@@ -22,63 +23,10 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead) gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead)
switch subType { switch subType {
case MsgPushConnectReq: // TODO: handle a msg move to a new function case MsgPushConnectReq: // TODO: handle a msg move to a new function
req := PushConnectReq{} err = handleConnectReq(pn, subType, msg)
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushConnectReq:%s", err)
return err
}
gLog.Printf(LvDEBUG, "%s is connecting...", req.From)
gLog.Println(LvDEBUG, "push connect response to ", req.From)
if compareVersion(req.Version, LeastSupportVersion) == LESS {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
rsp := PushConnectRsp{
Error: 10,
Detail: ErrVersionNotCompatible.Error(),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
return ErrVersionNotCompatible
}
// verify totp token or token
t := totp.TOTP{Step: totp.RelayTOTPStep}
if t.Verify(req.Token, pn.config.Token, time.Now().Unix()+(pn.serverTs-pn.localTs)) || // localTs may behind, auto adjust ts
t.Verify(req.Token, pn.config.Token, time.Now().Unix()) {
gLog.Printf(LvINFO, "Access Granted\n")
config := AppConfig{}
config.peerNatType = req.NatType
config.peerConeNatPort = req.ConeNatPort
config.peerIP = req.FromIP
config.PeerNode = req.From
config.peerVersion = req.Version
config.fromToken = req.Token
config.peerIPv6 = req.IPv6
config.hasIPv4 = req.HasIPv4
config.hasUPNPorNATPMP = req.HasUPNPorNATPMP
config.linkMode = req.LinkMode
config.isUnderlayServer = req.IsUnderlayServer
// share relay node will limit bandwidth
if req.Token != pn.config.Token {
gLog.Printf(LvINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth)
config.shareBandwidth = pn.config.ShareBandwidth
}
// go pn.AddTunnel(config, req.ID)
go pn.addDirectTunnel(config, req.ID)
break
}
gLog.Println(LvERROR, "Access Denied:", req.From)
rsp := PushConnectRsp{
Error: 1,
Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
case MsgPushRsp: case MsgPushRsp:
rsp := PushRsp{} rsp := PushRsp{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &rsp) if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil {
if err != nil {
gLog.Printf(LvERROR, "wrong pushRsp:%s", err) gLog.Printf(LvERROR, "wrong pushRsp:%s", err)
return err return err
} }
@@ -89,9 +37,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
} }
case MsgPushAddRelayTunnelReq: case MsgPushAddRelayTunnelReq:
req := AddRelayTunnelReq{} req := AddRelayTunnelReq{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req) if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
if err != nil { gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
return err return err
} }
config := AppConfig{} config := AppConfig{}
@@ -107,9 +54,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}(req) }(req)
case MsgPushAPPKey: case MsgPushAPPKey:
req := APPKeySync{} req := APPKeySync{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req) if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
if err != nil { gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
gLog.Printf(LvERROR, "wrong APPKeySync:%s", err)
return err return err
} }
SaveKey(req.AppID, req.AppKey) SaveKey(req.AppID, req.AppKey)
@@ -134,6 +80,139 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
os.Exit(0) os.Exit(0)
return err return err
case MsgPushReportApps: case MsgPushReportApps:
err = handleReportApps(pn, subType, msg)
case MsgPushReportLog:
err = handleLog(pn, subType, msg)
case MsgPushEditApp:
err = handleEditApp(pn, subType, msg)
case MsgPushEditNode:
gLog.Println(LvINFO, "MsgPushEditNode")
req := EditNode{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
gConf.setNode(req.NewName)
gConf.setShareBandwidth(req.Bandwidth)
// TODO: hot reload
os.Exit(0)
case MsgPushSwitchApp:
gLog.Println(LvINFO, "MsgPushSwitchApp")
app := AppInfo{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &app); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(app), err, string(msg[openP2PHeaderSize:]))
return err
}
config := AppConfig{Enabled: app.Enabled, SrcPort: app.SrcPort, Protocol: app.Protocol}
gLog.Println(LvINFO, app.AppName, " switch to ", app.Enabled)
gConf.switchApp(config, app.Enabled)
if app.Enabled == 0 {
// disable APP
pn.DeleteApp(config)
}
case MsgPushDstNodeOnline:
gLog.Println(LvINFO, "MsgPushDstNodeOnline")
req := PushDstNodeOnline{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
gLog.Println(LvINFO, "retry peerNode ", req.Node)
gConf.retryApp(req.Node)
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[pushHead.From]
pn.msgMapMtx.Unlock()
ch <- msg
}
return err
}
func handleEditApp(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvINFO, "MsgPushEditApp")
newApp := AppInfo{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &newApp); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(newApp), err, string(msg[openP2PHeaderSize:]))
return err
}
oldConf := AppConfig{Enabled: 1}
// protocol0+srcPort0 exist, delApp
oldConf.AppName = newApp.AppName
oldConf.Protocol = newApp.Protocol0
oldConf.Whitelist = newApp.Whitelist
oldConf.SrcPort = newApp.SrcPort0
oldConf.PeerNode = newApp.PeerNode
oldConf.DstHost = newApp.DstHost
oldConf.DstPort = newApp.DstPort
gConf.delete(oldConf)
// AddApp
newConf := oldConf
newConf.Protocol = newApp.Protocol
newConf.SrcPort = newApp.SrcPort
gConf.add(newConf, false)
pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
return nil
// autoReconnect will auto AddApp
// pn.AddApp(config)
// TODO: report result
}
func handleConnectReq(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
req := PushConnectReq{}
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
return err
}
gLog.Printf(LvDEBUG, "%s is connecting...", req.From)
gLog.Println(LvDEBUG, "push connect response to ", req.From)
if compareVersion(req.Version, LeastSupportVersion) == LESS {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
rsp := PushConnectRsp{
Error: 10,
Detail: ErrVersionNotCompatible.Error(),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
return ErrVersionNotCompatible
}
// verify totp token or token
t := totp.TOTP{Step: totp.RelayTOTPStep}
if t.Verify(req.Token, pn.config.Token, time.Now().Unix()-pn.dt/int64(time.Second)) { // localTs may behind, auto adjust ts
gLog.Printf(LvINFO, "Access Granted\n")
config := AppConfig{}
config.peerNatType = req.NatType
config.peerConeNatPort = req.ConeNatPort
config.peerIP = req.FromIP
config.PeerNode = req.From
config.peerVersion = req.Version
config.fromToken = req.Token
config.peerIPv6 = req.IPv6
config.hasIPv4 = req.HasIPv4
config.hasUPNPorNATPMP = req.HasUPNPorNATPMP
config.linkMode = req.LinkMode
config.isUnderlayServer = req.IsUnderlayServer
// share relay node will limit bandwidth
if req.Token != pn.config.Token {
gLog.Printf(LvINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth)
config.shareBandwidth = pn.config.ShareBandwidth
}
// go pn.AddTunnel(config, req.ID)
go pn.addDirectTunnel(config, req.ID)
return nil
}
gLog.Println(LvERROR, "Access Denied:", req.From)
rsp := PushConnectRsp{
Error: 1,
Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node),
To: req.From,
From: pn.config.Node,
}
return pn.push(req.From, MsgPushConnectRsp, rsp)
}
func handleReportApps(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvINFO, "MsgPushReportApps") gLog.Println(LvINFO, "MsgPushReportApps")
req := ReportApps{} req := ReportApps{}
gConf.mtx.Lock() gConf.mtx.Lock()
@@ -143,7 +222,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
relayNode := "" relayNode := ""
relayMode := "" relayMode := ""
linkMode := LinkModeUDPPunch linkMode := LinkModeUDPPunch
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) i, ok := pn.apps.Load(config.ID())
if ok { if ok {
app := i.(*p2pApp) app := i.(*p2pApp)
if app.isActive() { if app.isActive() {
@@ -157,6 +236,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
AppName: config.AppName, AppName: config.AppName,
Error: config.errMsg, Error: config.errMsg,
Protocol: config.Protocol, Protocol: config.Protocol,
Whitelist: config.Whitelist,
SrcPort: config.SrcPort, SrcPort: config.SrcPort,
RelayNode: relayNode, RelayNode: relayNode,
RelayMode: relayMode, RelayMode: relayMode,
@@ -174,13 +254,16 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
} }
req.Apps = append(req.Apps, appInfo) req.Apps = append(req.Apps, appInfo)
} }
pn.write(MsgReport, MsgReportApps, &req) return pn.write(MsgReport, MsgReportApps, &req)
case MsgPushReportLog: }
func handleLog(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvDEBUG, "MsgPushReportLog") gLog.Println(LvDEBUG, "MsgPushReportLog")
const defaultLen = 1024 * 128
const maxLen = 1024 * 1024
req := ReportLogReq{} req := ReportLogReq{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &req) if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
if err != nil { gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
gLog.Printf(LvERROR, "wrong MsgPushReportLog:%s %s", err, string(msg[openP2PHeaderSize:]))
return err return err
} }
if req.FileName == "" { if req.FileName == "" {
@@ -189,104 +272,35 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
f, err := os.Open(filepath.Join("log", req.FileName)) f, err := os.Open(filepath.Join("log", req.FileName))
if err != nil { if err != nil {
gLog.Println(LvERROR, "read log file error:", err) gLog.Println(LvERROR, "read log file error:", err)
break return err
} }
fi, err := f.Stat() fi, err := f.Stat()
if err != nil { if err != nil {
break return err
} }
if req.Offset == 0 && fi.Size() > 4096 { if req.Offset > fi.Size() {
req.Offset = fi.Size() - 4096 req.Offset = fi.Size() - defaultLen
} }
if req.Len <= 0 { // verify input parameters
req.Len = 4096 if req.Offset < 0 {
req.Offset = 0
} }
if req.Len <= 0 || req.Len > maxLen {
req.Len = defaultLen
}
f.Seek(req.Offset, 0) f.Seek(req.Offset, 0)
if req.Len > 1024*1024 { // too large
break
}
buff := make([]byte, req.Len) buff := make([]byte, req.Len)
readLength, err := f.Read(buff) readLength, err := f.Read(buff)
f.Close() f.Close()
if err != nil { if err != nil {
gLog.Println(LvERROR, "read log content error:", err) gLog.Println(LvERROR, "read log content error:", err)
break return err
} }
rsp := ReportLogRsp{} rsp := ReportLogRsp{}
rsp.Content = string(buff[:readLength]) rsp.Content = string(buff[:readLength])
rsp.FileName = req.FileName rsp.FileName = req.FileName
rsp.Total = fi.Size() rsp.Total = fi.Size()
rsp.Len = req.Len rsp.Len = req.Len
pn.write(MsgReport, MsgPushReportLog, &rsp) return pn.write(MsgReport, MsgPushReportLog, &rsp)
case MsgPushEditApp:
gLog.Println(LvINFO, "MsgPushEditApp")
newApp := AppInfo{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &newApp)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushEditApp:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
oldConf := AppConfig{Enabled: 1}
// protocol0+srcPort0 exist, delApp
oldConf.AppName = newApp.AppName
oldConf.Protocol = newApp.Protocol0
oldConf.SrcPort = newApp.SrcPort0
oldConf.PeerNode = newApp.PeerNode
oldConf.DstHost = newApp.DstHost
oldConf.DstPort = newApp.DstPort
gConf.delete(oldConf)
// AddApp
newConf := oldConf
newConf.Protocol = newApp.Protocol
newConf.SrcPort = newApp.SrcPort
gConf.add(newConf, false)
pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
// autoReconnect will auto AddApp
// pn.AddApp(config)
// TODO: report result
case MsgPushEditNode:
gLog.Println(LvINFO, "MsgPushEditNode")
req := EditNode{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushEditNode:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
gConf.setNode(req.NewName)
gConf.setShareBandwidth(req.Bandwidth)
// TODO: hot reload
os.Exit(0)
case MsgPushSwitchApp:
gLog.Println(LvINFO, "MsgPushSwitchApp")
app := AppInfo{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &app)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushSwitchApp:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
config := AppConfig{Enabled: app.Enabled, SrcPort: app.SrcPort, Protocol: app.Protocol}
gLog.Println(LvINFO, app.AppName, " switch to ", app.Enabled)
gConf.switchApp(config, app.Enabled)
if app.Enabled == 0 {
// disable APP
pn.DeleteApp(config)
}
case MsgPushDstNodeOnline:
gLog.Println(LvINFO, "MsgPushDstNodeOnline")
app := PushDstNodeOnline{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &app)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushDstNodeOnline:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
gLog.Println(LvINFO, "retry peerNode ", app.Node)
gConf.retryApp(app.Node)
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[pushHead.From]
pn.msgMapMtx.Unlock()
ch <- msg
}
return nil
} }

View File

@@ -23,11 +23,11 @@ func handshakeC2C(t *P2PTunnel) (err error) {
gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshake error:", err) gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshake error:", err)
return err return err
} }
ra, head, _, _, err := UDPRead(conn, SymmetricHandshakeAckTimeout) ra, head, _, _, err := UDPRead(conn, HandshakeTimeout)
if err != nil { if err != nil {
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
gLog.Println(LvDEBUG, err, ", return this error when ip was not reachable, retry read") gLog.Println(LvDEBUG, err, ", return this error when ip was not reachable, retry read")
ra, head, _, _, err = UDPRead(conn, SymmetricHandshakeAckTimeout) ra, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil { if err != nil {
gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err) gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err)
return err return err
@@ -38,7 +38,7 @@ func handshakeC2C(t *P2PTunnel) (err error) {
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
gLog.Printf(LvDEBUG, "read %d handshake ", t.id) gLog.Printf(LvDEBUG, "read %d handshake ", t.id)
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
_, head, _, _, err = UDPRead(conn, SymmetricHandshakeAckTimeout) _, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil { if err != nil {
gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err) gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err)
return err return err
@@ -66,7 +66,7 @@ func handshakeC2S(t *P2PTunnel) error {
gLog.Printf(LvDEBUG, "handshakeC2S start") gLog.Printf(LvDEBUG, "handshakeC2S start")
defer gLog.Printf(LvDEBUG, "handshakeC2S end") defer gLog.Printf(LvDEBUG, "handshakeC2S end")
// even if read timeout, continue handshake // even if read timeout, continue handshake
t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, SymmetricHandshakeAckTimeout) t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, HandshakeTimeout)
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
randPorts := r.Perm(65532) randPorts := r.Perm(65532)
conn, err := net.ListenUDP("udp", t.la) conn, err := net.ListenUDP("udp", t.la)
@@ -92,7 +92,7 @@ func handshakeC2S(t *P2PTunnel) error {
gLog.Println(LvDEBUG, "send symmetric handshake end") gLog.Println(LvDEBUG, "send symmetric handshake end")
return nil return nil
}() }()
deadline := time.Now().Add(SymmetricHandshakeAckTimeout) deadline := time.Now().Add(HandshakeTimeout)
err = conn.SetReadDeadline(deadline) err = conn.SetReadDeadline(deadline)
if err != nil { if err != nil {
gLog.Println(LvERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error") gLog.Println(LvERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error")
@@ -140,7 +140,7 @@ func handshakeS2C(t *P2PTunnel) error {
} }
defer conn.Close() defer conn.Close()
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id})
_, head, _, _, err := UDPRead(conn, SymmetricHandshakeAckTimeout) _, head, _, _, err := UDPRead(conn, HandshakeTimeout)
if err != nil { if err != nil {
// gLog.Println(LevelDEBUG, "one of the handshake error:", err) // gLog.Println(LevelDEBUG, "one of the handshake error:", err)
return err return err
@@ -155,7 +155,7 @@ func handshakeS2C(t *P2PTunnel) error {
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ", t.id) gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ", t.id)
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
_, head, _, _, err = UDPRead(conn, SymmetricHandshakeAckTimeout) _, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil { if err != nil {
gLog.Println(LvDEBUG, "handshakeS2C handshake error") gLog.Println(LvDEBUG, "handshakeS2C handshake error")
return err return err
@@ -174,7 +174,7 @@ func handshakeS2C(t *P2PTunnel) error {
t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id}) t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id})
select { select {
case <-time.After(SymmetricHandshakeAckTimeout): case <-time.After(HandshakeTimeout):
return fmt.Errorf("wait handshake failed") return fmt.Errorf("wait handshake failed")
case la := <-gotCh: case la := <-gotCh:
gLog.Println(LvDEBUG, "symmetric handshake ok", la) gLog.Println(LvDEBUG, "symmetric handshake ok", la)

155
core/iptree.go Normal file
View File

@@ -0,0 +1,155 @@
package openp2p
import (
"bytes"
"encoding/binary"
"fmt"
"log"
"net"
"strings"
"sync"
"github.com/emirpasic/gods/trees/avltree"
"github.com/emirpasic/gods/utils"
)
type IPTree struct {
tree *avltree.Tree
treeMtx sync.RWMutex
}
// add 120k cost 0.5s
func (iptree *IPTree) AddIntIP(minIP uint32, maxIP uint32) bool {
if minIP > maxIP {
return false
}
iptree.treeMtx.Lock()
defer iptree.treeMtx.Unlock()
newMinIP := minIP
newMaxIP := maxIP
cur := iptree.tree.Root
for {
if cur == nil {
break
}
curMaxIP := cur.Value.(uint32)
curMinIP := cur.Key.(uint32)
// newNode all in existNode, treat as inserted.
if newMinIP >= curMinIP && newMaxIP <= curMaxIP {
return true
}
// has no interset
if newMinIP > curMaxIP {
cur = cur.Children[1]
continue
}
if newMaxIP < curMinIP {
cur = cur.Children[0]
continue
}
// has interset, rm it and Add the new merged ip segment
iptree.tree.Remove(curMinIP)
if curMinIP < newMinIP {
newMinIP = curMinIP
}
if curMaxIP > newMaxIP {
newMaxIP = curMaxIP
}
cur = iptree.tree.Root
}
// put in the tree
iptree.tree.Put(newMinIP, newMaxIP)
return true
}
func (iptree *IPTree) Add(minIPStr string, maxIPStr string) bool {
var minIP, maxIP uint32
binary.Read(bytes.NewBuffer(net.ParseIP(minIPStr).To4()), binary.BigEndian, &minIP)
binary.Read(bytes.NewBuffer(net.ParseIP(maxIPStr).To4()), binary.BigEndian, &maxIP)
return iptree.AddIntIP(minIP, maxIP)
}
func (iptree *IPTree) Contains(ipStr string) bool {
var ip uint32
binary.Read(bytes.NewBuffer(net.ParseIP(ipStr).To4()), binary.BigEndian, &ip)
return iptree.ContainsInt(ip)
}
func (iptree *IPTree) ContainsInt(ip uint32) bool {
iptree.treeMtx.RLock()
defer iptree.treeMtx.RUnlock()
if iptree.tree == nil {
return false
}
n := iptree.tree.Root
for n != nil {
curMaxIP := n.Value.(uint32)
curMinIP := n.Key.(uint32)
switch {
case ip >= curMinIP && ip <= curMaxIP: // hit
return true
case ip < curMinIP:
n = n.Children[0]
default:
n = n.Children[1]
}
}
return false
}
func (iptree *IPTree) Size() int {
iptree.treeMtx.RLock()
defer iptree.treeMtx.RUnlock()
return iptree.tree.Size()
}
func (iptree *IPTree) Print() {
iptree.treeMtx.RLock()
defer iptree.treeMtx.RUnlock()
log.Println("size:", iptree.Size())
log.Println(iptree.tree.String())
}
func (iptree *IPTree) Clear() {
iptree.treeMtx.Lock()
defer iptree.treeMtx.Unlock()
iptree.tree.Clear()
}
// input format 127.0.0.1,192.168.1.0/24,10.1.1.30-10.1.1.50
// 127.0.0.1
// 192.168.1.0/24
// 192.168.1.1-192.168.1.10
func NewIPTree(ips string) *IPTree {
iptree := &IPTree{
tree: avltree.NewWith(utils.UInt32Comparator),
}
ipArr := strings.Split(ips, ",")
for _, ip := range ipArr {
if strings.Contains(ip, "/") { // x.x.x.x/24
_, ipNet, err := net.ParseCIDR(ip)
if err != nil {
fmt.Println("Error parsing CIDR:", err)
continue
}
minIP := ipNet.IP.Mask(ipNet.Mask).String()
maxIP := calculateMaxIP(ipNet).String()
iptree.Add(minIP, maxIP)
} else if strings.Contains(ip, "-") { // x.x.x.x-y.y.y.y
minAndMax := strings.Split(ip, "-")
iptree.Add(minAndMax[0], minAndMax[1])
} else { // single ip
iptree.Add(ip, ip)
}
}
return iptree
}
func calculateMaxIP(ipNet *net.IPNet) net.IP {
maxIP := make(net.IP, len(ipNet.IP))
copy(maxIP, ipNet.IP)
for i := range maxIP {
maxIP[i] |= ^ipNet.Mask[i]
}
return maxIP
}

171
core/iptree_test.go Normal file
View File

@@ -0,0 +1,171 @@
package openp2p
import (
"bytes"
"encoding/binary"
"net"
"testing"
)
func wrapTestContains(t *testing.T, iptree *IPTree, ip string, result bool) {
if iptree.Contains(ip) == result {
// t.Logf("compare version %s %s ok\n", v1, v2)
} else {
t.Errorf("test %s fail\n", ip)
}
}
func wrapBenchmarkContains(t *testing.B, iptree *IPTree, ip string, result bool) {
if iptree.Contains(ip) == result {
// t.Logf("compare version %s %s ok\n", v1, v2)
} else {
t.Errorf("test %s fail\n", ip)
}
}
func TestAllInputFormat(t *testing.T) {
iptree := NewIPTree("219.137.185.70,127.0.0.1,127.0.0.0/8,192.168.1.0/24,192.168.3.100-192.168.3.255,192.168.100.0-192.168.200.255")
wrapTestContains(t, iptree, "127.0.0.1", true)
wrapTestContains(t, iptree, "127.0.0.2", true)
wrapTestContains(t, iptree, "127.1.1.1", true)
wrapTestContains(t, iptree, "219.137.185.70", true)
wrapTestContains(t, iptree, "219.137.185.71", false)
wrapTestContains(t, iptree, "192.168.1.2", true)
wrapTestContains(t, iptree, "192.168.2.2", false)
wrapTestContains(t, iptree, "192.168.3.1", false)
wrapTestContains(t, iptree, "192.168.3.100", true)
wrapTestContains(t, iptree, "192.168.3.255", true)
wrapTestContains(t, iptree, "192.168.150.1", true)
wrapTestContains(t, iptree, "192.168.250.1", false)
}
func TestSingleIP(t *testing.T) {
iptree := NewIPTree("")
iptree.Add("219.137.185.70", "219.137.185.70")
wrapTestContains(t, iptree, "219.137.185.70", true)
wrapTestContains(t, iptree, "219.137.185.71", false)
}
func TestWrongSegment(t *testing.T) {
iptree := NewIPTree("")
inserted := iptree.Add("87.251.75.0", "82.251.75.255")
if inserted {
t.Errorf("TestWrongSegment failed\n")
}
}
func TestSegment2(t *testing.T) {
iptree := NewIPTree("")
iptree.Clear()
iptree.Add("10.1.5.50", "10.1.5.100")
iptree.Add("10.1.1.50", "10.1.1.100")
iptree.Add("10.1.2.50", "10.1.2.100")
iptree.Add("10.1.6.50", "10.1.6.100")
iptree.Add("10.1.7.50", "10.1.7.100")
iptree.Add("10.1.3.50", "10.1.3.100")
iptree.Add("10.1.1.1", "10.1.1.10") // no interset
iptree.Add("10.1.1.200", "10.1.1.250") // no interset
iptree.Print()
iptree.Add("10.1.1.80", "10.1.1.90") // all in
iptree.Add("10.1.1.40", "10.1.1.60") // interset
iptree.Print()
iptree.Add("10.1.1.90", "10.1.1.110") // interset
iptree.Print()
t.Logf("blocklist size:%d\n", iptree.Size())
wrapTestContains(t, iptree, "10.1.1.40", true)
wrapTestContains(t, iptree, "10.1.5.50", true)
wrapTestContains(t, iptree, "10.1.6.50", true)
wrapTestContains(t, iptree, "10.1.7.50", true)
wrapTestContains(t, iptree, "10.1.2.50", true)
wrapTestContains(t, iptree, "10.1.3.50", true)
wrapTestContains(t, iptree, "10.1.1.60", true)
wrapTestContains(t, iptree, "10.1.1.90", true)
wrapTestContains(t, iptree, "10.1.1.110", true)
wrapTestContains(t, iptree, "10.1.1.250", true)
wrapTestContains(t, iptree, "10.1.2.60", true)
wrapTestContains(t, iptree, "10.1.100.30", false)
wrapTestContains(t, iptree, "10.1.200.30", false)
iptree.Add("10.0.0.0", "10.255.255.255") // will merge all segment
iptree.Print()
if iptree.Size() != 1 {
t.Errorf("merge ip segment error\n")
}
}
func BenchmarkBuildBlockList20k(t *testing.B) {
iptree := NewIPTree("")
iptree.Clear()
iptree.Add("10.1.5.50", "10.1.5.100")
iptree.Add("10.1.1.50", "10.1.1.100")
iptree.Add("10.1.2.50", "10.1.2.100")
iptree.Add("10.1.6.50", "10.1.6.100")
iptree.Add("10.1.7.50", "10.1.7.100")
iptree.Add("10.1.3.50", "10.1.3.100")
iptree.Add("10.1.1.1", "10.1.1.10") // no interset
iptree.Add("10.1.1.200", "10.1.1.250") // no interset
iptree.Add("10.1.1.80", "10.1.1.90") // all in
iptree.Add("10.1.1.40", "10.1.1.60") // interset
iptree.Add("10.1.1.90", "10.1.1.110") // interset
var minIP uint32
binary.Read(bytes.NewBuffer(net.ParseIP("10.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 10k block ip single
nodeNum := uint32(10000 * 1)
gap := uint32(10)
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i)
// t.Logf("blocklist size:%d\n", iptree.Size())
}
binary.Read(bytes.NewBuffer(net.ParseIP("100.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 100k block ip segment
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i+5)
}
t.Logf("blocklist size:%d\n", iptree.Size())
iptree.Clear()
t.Logf("clear. blocklist size:%d\n", iptree.Size())
}
func BenchmarkQuery(t *testing.B) {
iptree := NewIPTree("")
iptree.Clear()
iptree.Add("10.1.5.50", "10.1.5.100")
iptree.Add("10.1.1.50", "10.1.1.100")
iptree.Add("10.1.2.50", "10.1.2.100")
iptree.Add("10.1.6.50", "10.1.6.100")
iptree.Add("10.1.7.50", "10.1.7.100")
iptree.Add("10.1.3.50", "10.1.3.100")
iptree.Add("10.1.1.1", "10.1.1.10") // no interset
iptree.Add("10.1.1.200", "10.1.1.250") // no interset
iptree.Add("10.1.1.80", "10.1.1.90") // all in
iptree.Add("10.1.1.40", "10.1.1.60") // interset
iptree.Add("10.1.1.90", "10.1.1.110") // interset
var minIP uint32
binary.Read(bytes.NewBuffer(net.ParseIP("10.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 10k block ip single
nodeNum := uint32(10000 * 100)
gap := uint32(10)
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i)
// t.Logf("blocklist size:%d\n", iptree.Size())
}
binary.Read(bytes.NewBuffer(net.ParseIP("100.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 100k block ip segment
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i+5)
}
t.Logf("blocklist size:%d\n", iptree.Size())
t.ResetTimer()
queryNum := 100 * 10000
for i := 0; i < queryNum; i++ {
iptree.ContainsInt(minIP + uint32(i))
wrapBenchmarkContains(t, iptree, "10.1.5.55", true)
wrapBenchmarkContains(t, iptree, "10.1.1.1", true)
wrapBenchmarkContains(t, iptree, "10.1.5.200", false)
wrapBenchmarkContains(t, iptree, "200.1.1.1", false)
}
t.Logf("query list:%d\n", queryNum*4)
}

View File

@@ -51,6 +51,7 @@ type logger struct {
pid int pid int
maxLogSize int64 maxLogSize int64
mode int mode int
stdLogger *log.Logger
} }
func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64, mode int) *logger { func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64, mode int) *logger {
@@ -73,7 +74,7 @@ func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64,
} }
os.Chmod(logFilePath, 0644) os.Chmod(logFilePath, 0644)
logfiles[lv] = f logfiles[lv] = f
loggers[lv] = log.New(f, "", log.LstdFlags) loggers[lv] = log.New(f, "", log.LstdFlags|log.Lmicroseconds)
} }
var le string var le string
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
@@ -81,7 +82,8 @@ func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64,
} else { } else {
le = "\n" le = "\n"
} }
pLog := &logger{loggers, logfiles, level, logdir, &sync.Mutex{}, le, os.Getpid(), maxLogSize, mode} pLog := &logger{loggers, logfiles, level, logdir, &sync.Mutex{}, le, os.Getpid(), maxLogSize, mode, log.New(os.Stdout, "", 0)}
pLog.stdLogger.SetFlags(log.LstdFlags | log.Lmicroseconds)
go pLog.checkFile() go pLog.checkFile()
return pLog return pLog
} }
@@ -142,7 +144,7 @@ func (l *logger) Printf(level LogLevel, format string, params ...interface{}) {
l.loggers[0].Printf("%d %s "+format+l.lineEnding, params...) l.loggers[0].Printf("%d %s "+format+l.lineEnding, params...)
} }
if l.mode == LogConsole || l.mode == LogFileAndConsole { if l.mode == LogConsole || l.mode == LogFileAndConsole {
log.Printf("%d %s "+format+l.lineEnding, params...) l.stdLogger.Printf("%d %s "+format+l.lineEnding, params...)
} }
} }
@@ -159,6 +161,6 @@ func (l *logger) Println(level LogLevel, params ...interface{}) {
l.loggers[0].Print(params...) l.loggers[0].Print(params...)
} }
if l.mode == LogConsole || l.mode == LogFileAndConsole { if l.mode == LogConsole || l.mode == LogFileAndConsole {
log.Print(params...) l.stdLogger.Print(params...)
} }
} }

View File

@@ -3,7 +3,6 @@ package openp2p
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"math/rand" "math/rand"
"net" "net"
"strconv" "strconv"
@@ -14,20 +13,22 @@ import (
reuse "github.com/openp2p-cn/go-reuseport" reuse "github.com/openp2p-cn/go-reuseport"
) )
func natTCP(serverHost string, serverPort int, localPort int) (publicIP string, publicPort int) { func natTCP(serverHost string, serverPort int) (publicIP string, publicPort int, localPort int) {
// dialer := &net.Dialer{ // dialer := &net.Dialer{
// LocalAddr: &net.TCPAddr{ // LocalAddr: &net.TCPAddr{
// IP: net.ParseIP("0.0.0.0"), // IP: net.ParseIP("0.0.0.0"),
// Port: localPort, // Port: localPort,
// }, // },
// } // }
conn, err := reuse.DialTimeout("tcp4", fmt.Sprintf("%s:%d", "0.0.0.0", localPort), fmt.Sprintf("%s:%d", serverHost, serverPort), time.Second*5) conn, err := reuse.DialTimeout("tcp4", fmt.Sprintf("%s:%d", "0.0.0.0", 0), fmt.Sprintf("%s:%d", serverHost, serverPort), NatTestTimeout)
// conn, err := net.Dial("tcp4", fmt.Sprintf("%s:%d", serverHost, serverPort)) // conn, err := net.Dial("tcp4", fmt.Sprintf("%s:%d", serverHost, serverPort))
// log.Println(LvINFO, conn.LocalAddr())
if err != nil { if err != nil {
fmt.Printf("Dial tcp4 %s:%d error:%s", serverHost, serverPort, err) fmt.Printf("Dial tcp4 %s:%d error:%s", serverHost, serverPort, err)
return return
} }
defer conn.Close() defer conn.Close()
localPort, _ = strconv.Atoi(strings.Split(conn.LocalAddr().String(), ":")[1])
_, wrerr := conn.Write([]byte("1")) _, wrerr := conn.Write([]byte("1"))
if wrerr != nil { if wrerr != nil {
fmt.Printf("Write error: %s\n", wrerr) fmt.Printf("Write error: %s\n", wrerr)
@@ -83,7 +84,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string,
return "", 0, err return "", 0, err
} }
natRsp := NatDetectRsp{} natRsp := NatDetectRsp{}
err = json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp) json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp)
return natRsp.IP, natRsp.Port, nil return natRsp.IP, natRsp.Port, nil
} }
@@ -120,6 +121,7 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP
echoConn, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: echoPort}) echoConn, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: echoPort})
if err != nil { if err != nil {
gLog.Println(LvERROR, "echo server listen error:", err) gLog.Println(LvERROR, "echo server listen error:", err)
wg.Done()
return return
} }
buf := make([]byte, 1600) buf := make([]byte, 1600)
@@ -135,6 +137,9 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP
gLog.Println(LvDEBUG, "echo server end") gLog.Println(LvDEBUG, "echo server end")
}() }()
wg.Wait() // wait echo udp wg.Wait() // wait echo udp
if echoConn == nil { // listen error
return
}
defer echoConn.Close() defer echoConn.Close()
// testing for public ip // testing for public ip
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
@@ -151,14 +156,14 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP
gLog.Println(LvDEBUG, "could not perform UPNP external address:", err) gLog.Println(LvDEBUG, "could not perform UPNP external address:", err)
break break
} }
log.Println("PublicIP:", ext) gLog.Println(LvINFO, "PublicIP:", ext)
externalPort, err := nat.AddPortMapping("udp", echoPort, echoPort, "openp2p", 604800) // 7 days, upnp will perform failed when os start externalPort, err := nat.AddPortMapping("udp", echoPort, echoPort, "openp2p", 30) // 30 seconds fot upnp testing
if err != nil { if err != nil {
gLog.Println(LvDEBUG, "could not add udp UPNP port mapping", externalPort) gLog.Println(LvDEBUG, "could not add udp UPNP port mapping", externalPort)
break break
} else { } else {
nat.AddPortMapping("tcp", echoPort, echoPort, "openp2p", 604800) // 7 days nat.AddPortMapping("tcp", echoPort, echoPort, "openp2p", 604800) // 7 days for tcp connection
} }
} }
gLog.Printf(LvDEBUG, "public ip test start %s:%d", publicIP, echoPort) gLog.Printf(LvDEBUG, "public ip test start %s:%d", publicIP, echoPort)

View File

@@ -21,7 +21,6 @@ func Run() {
fmt.Println(OpenP2PVersion) fmt.Println(OpenP2PVersion)
return return
case "update": case "update":
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFileAndConsole)
targetPath := filepath.Join(defaultInstallPath, defaultBinName) targetPath := filepath.Join(defaultInstallPath, defaultBinName)
d := daemon{} d := daemon{}
err := d.Control("restart", targetPath, nil) err := d.Control("restart", targetPath, nil)

View File

@@ -84,7 +84,6 @@ func (oConn *overlayConn) run() {
} }
oConn.tunnel.overlayConns.Delete(oConn.id) oConn.tunnel.overlayConns.Delete(oConn.id)
// notify peer disconnect // notify peer disconnect
if oConn.isClient {
req := OverlayDisconnectReq{ID: oConn.id} req := OverlayDisconnectReq{ID: oConn.id}
if oConn.rtid == 0 { if oConn.rtid == 0 {
oConn.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req) oConn.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
@@ -94,7 +93,6 @@ func (oConn *overlayConn) run() {
msgWithHead := append(relayHead.Bytes(), msg...) msgWithHead := append(relayHead.Bytes(), msg...)
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
} }
}
} }
func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, dataLen int, err error) { func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, dataLen int, err error) {

View File

@@ -17,6 +17,7 @@ type p2pApp struct {
listener net.Listener listener net.Listener
listenerUDP *net.UDPConn listenerUDP *net.UDPConn
tunnel *P2PTunnel tunnel *P2PTunnel
iptree *IPTree
rtid uint64 // relay tunnelID rtid uint64 // relay tunnelID
relayNode string relayNode string
relayMode string relayMode string
@@ -64,6 +65,15 @@ func (app *p2pApp) listenTCP() error {
} }
break break
} }
// check white list
if app.config.Whitelist != "" {
remoteIP := strings.Split(conn.RemoteAddr().String(), ":")[0]
if !app.iptree.Contains(remoteIP) {
conn.Close()
gLog.Printf(LvERROR, "%s not in whitelist, access denied", remoteIP)
continue
}
}
oConn := overlayConn{ oConn := overlayConn{
tunnel: app.tunnel, tunnel: app.tunnel,
connTCP: conn, connTCP: conn,
@@ -82,7 +92,7 @@ func (app *p2pApp) listenTCP() error {
oConn.appKeyBytes = encryptKey oConn.appKeyBytes = encryptKey
} }
app.tunnel.overlayConns.Store(oConn.id, &oConn) app.tunnel.overlayConns.Store(oConn.id, &oConn)
gLog.Printf(LvDEBUG, "Accept TCP overlayID:%d", oConn.id) gLog.Printf(LvDEBUG, "Accept TCP overlayID:%d, %s", oConn.id, oConn.connTCP.RemoteAddr())
// tell peer connect // tell peer connect
req := OverlayConnectReq{ID: oConn.id, req := OverlayConnectReq{ID: oConn.id,
Token: app.tunnel.pn.config.Token, Token: app.tunnel.pn.config.Token,

View File

@@ -3,6 +3,7 @@ package openp2p
import ( import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"crypto/x509"
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"errors" "errors"
@@ -10,6 +11,7 @@ import (
"math/rand" "math/rand"
"net/http" "net/http"
"net/url" "net/url"
"reflect"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -25,6 +27,8 @@ var (
const ( const (
retryLimit = 20 retryLimit = 20
retryInterval = 10 * time.Second retryInterval = 10 * time.Second
dtma = 20
ddtma = 5
) )
type P2PNetwork struct { type P2PNetwork struct {
@@ -34,9 +38,12 @@ type P2PNetwork struct {
restartCh chan bool restartCh chan bool
wgReconnect sync.WaitGroup wgReconnect sync.WaitGroup
writeMtx sync.Mutex writeMtx sync.Mutex
serverTs int64
localTs int64
hbTime time.Time hbTime time.Time
// for sync server time
t1 int64 // nanoSeconds
dt int64 // client faster then server dt nanoSeconds
dtma int64
ddt int64 // differential of dt
// msgMap sync.Map // msgMap sync.Map
msgMap map[uint64]chan []byte //key: nodeID msgMap map[uint64]chan []byte //key: nodeID
msgMapMtx sync.Mutex msgMapMtx sync.Mutex
@@ -55,6 +62,8 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
running: true, running: true,
msgMap: make(map[uint64]chan []byte), msgMap: make(map[uint64]chan []byte),
limiter: newBandwidthLimiter(config.ShareBandwidth), limiter: newBandwidthLimiter(config.ShareBandwidth),
dt: 0,
ddt: 0,
} }
instance.msgMap[0] = make(chan []byte) // for gateway instance.msgMap[0] = make(chan []byte) // for gateway
if config != nil { if config != nil {
@@ -69,11 +78,13 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
func (pn *P2PNetwork) run() { func (pn *P2PNetwork) run() {
heartbeatTimer := time.NewTicker(NetworkHeartbeatTime) heartbeatTimer := time.NewTicker(NetworkHeartbeatTime)
pn.t1 = time.Now().UnixNano()
pn.write(MsgHeartbeat, 0, "")
for pn.running { for pn.running {
select { select {
case <-heartbeatTimer.C: case <-heartbeatTimer.C:
pn.t1 = time.Now().UnixNano()
pn.write(MsgHeartbeat, 0, "") pn.write(MsgHeartbeat, 0, "")
case <-pn.restartCh: case <-pn.restartCh:
pn.online = false pn.online = false
pn.wgReconnect.Wait() // wait read/autorunapp goroutine end pn.wgReconnect.Wait() // wait read/autorunapp goroutine end
@@ -87,7 +98,7 @@ func (pn *P2PNetwork) run() {
} }
func (pn *P2PNetwork) Connect(timeout int) bool { func (pn *P2PNetwork) Connect(timeout int) bool {
// waiting for login response // waiting for heartbeat
for i := 0; i < (timeout / 1000); i++ { for i := 0; i < (timeout / 1000); i++ {
if pn.hbTime.After(time.Now().Add(-NetworkHeartbeatTime)) { if pn.hbTime.After(time.Now().Add(-NetworkHeartbeatTime)) {
return true return true
@@ -103,30 +114,18 @@ func (pn *P2PNetwork) runAll() {
allApps := gConf.Apps // read a copy, other thread will modify the gConf.Apps allApps := gConf.Apps // read a copy, other thread will modify the gConf.Apps
for _, config := range allApps { for _, config := range allApps {
if config.nextRetryTime.After(time.Now()) { if config.nextRetryTime.After(time.Now()) || config.Enabled == 0 || config.retryNum >= retryLimit {
continue
}
if config.Enabled == 0 {
continue continue
} }
if config.AppName == "" { if config.AppName == "" {
config.AppName = fmt.Sprintf("%s%d", config.Protocol, config.SrcPort) config.AppName = config.ID()
} }
appExist := false if i, ok := pn.apps.Load(config.ID()); ok {
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) if app := i.(*p2pApp); app.isActive() {
if ok {
app := i.(*p2pApp)
appExist = true
if app.isActive() {
continue continue
} }
}
if appExist {
pn.DeleteApp(*config) pn.DeleteApp(*config)
} }
if config.retryNum >= retryLimit {
continue
}
if config.retryNum > 0 { // first time not show reconnect log if config.retryNum > 0 { // first time not show reconnect log
gLog.Printf(LvINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum) gLog.Printf(LvINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum)
@@ -151,6 +150,7 @@ func (pn *P2PNetwork) runAll() {
} }
} }
} }
func (pn *P2PNetwork) autorunApp() { func (pn *P2PNetwork) autorunApp() {
gLog.Println(LvINFO, "autorunApp start") gLog.Println(LvINFO, "autorunApp start")
pn.wgReconnect.Add(1) pn.wgReconnect.Add(1)
@@ -172,9 +172,7 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
return nil, 0, "", errors.New("read MsgRelayNodeRsp error") return nil, 0, "", errors.New("read MsgRelayNodeRsp error")
} }
rsp := RelayNodeRsp{} rsp := RelayNodeRsp{}
err := json.Unmarshal(body, &rsp) if err := json.Unmarshal(body, &rsp); err != nil {
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error") return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error")
} }
if rsp.RelayName == "" || rsp.RelayToken == 0 { if rsp.RelayName == "" || rsp.RelayToken == 0 {
@@ -207,9 +205,7 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
return nil, 0, "", errors.New("read MsgPushAddRelayTunnelRsp error") return nil, 0, "", errors.New("read MsgPushAddRelayTunnelRsp error")
} }
rspID := TunnelMsg{} rspID := TunnelMsg{}
err = json.Unmarshal(body, &rspID) if err = json.Unmarshal(body, &rspID); err != nil {
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error") return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error")
} }
return t, rspID.ID, rsp.Mode, err return t, rspID.ID, rsp.Mode, err
@@ -224,7 +220,7 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
} }
// check if app already exist? // check if app already exist?
appExist := false appExist := false
_, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) _, ok := pn.apps.Load(config.ID())
if ok { if ok {
appExist = true appExist = true
} }
@@ -290,11 +286,13 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
key: appKey, key: appKey,
tunnel: t, tunnel: t,
config: config, config: config,
iptree: NewIPTree(config.Whitelist),
rtid: rtid, rtid: rtid,
relayNode: relayNode, relayNode: relayNode,
relayMode: relayMode, relayMode: relayMode,
hbTime: time.Now()} hbTime: time.Now()}
pn.apps.Store(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort), &app) pn.apps.Store(config.ID(), &app)
gLog.Printf(LvINFO, "%s use tunnel %d", app.config.AppName, app.tunnel.id)
if err == nil { if err == nil {
go app.listen() go app.listen()
} }
@@ -305,16 +303,37 @@ func (pn *P2PNetwork) DeleteApp(config AppConfig) {
gLog.Printf(LvINFO, "DeleteApp %s%d start", config.Protocol, config.SrcPort) gLog.Printf(LvINFO, "DeleteApp %s%d start", config.Protocol, config.SrcPort)
defer gLog.Printf(LvINFO, "DeleteApp %s%d end", config.Protocol, config.SrcPort) defer gLog.Printf(LvINFO, "DeleteApp %s%d end", config.Protocol, config.SrcPort)
// close the apps of this config // close the apps of this config
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) i, ok := pn.apps.Load(config.ID())
if ok { if ok {
app := i.(*p2pApp) app := i.(*p2pApp)
gLog.Printf(LvINFO, "app %s exist, delete it", fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) gLog.Printf(LvINFO, "app %s exist, delete it", app.config.AppName)
app.close() app.close()
pn.apps.Delete(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) pn.apps.Delete(config.ID())
} }
} }
func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, error) { func (pn *P2PNetwork) findTunnel(config *AppConfig) (t *P2PTunnel) {
// find existing tunnel to peer
pn.allTunnels.Range(func(id, i interface{}) bool {
tmpt := i.(*P2PTunnel)
if tmpt.config.PeerNode == config.PeerNode {
gLog.Println(LvINFO, "tunnel already exist ", config.PeerNode)
isActive := tmpt.checkActive()
// inactive, close it
if !isActive {
gLog.Println(LvINFO, "but it's not active, close it ", config.PeerNode)
tmpt.close()
} else {
t = tmpt
}
return false
}
return true
})
return t
}
func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunnel, err error) {
gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
isClient := false isClient := false
@@ -323,64 +342,34 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
tid = rand.Uint64() tid = rand.Uint64()
isClient = true isClient = true
} }
exist := false
// find existing tunnel to peer
var t *P2PTunnel
pn.allTunnels.Range(func(id, i interface{}) bool {
t = i.(*P2PTunnel)
if t.config.PeerNode == config.PeerNode {
// server side force close existing tunnel
if !isClient {
t.close()
return false
}
// client side checking if t = pn.findTunnel(&config); t != nil {
gLog.Println(LvINFO, "tunnel already exist ", config.PeerNode)
isActive := t.checkActive()
// inactive, close it
if !isActive {
gLog.Println(LvINFO, "but it's not active, close it ", config.PeerNode)
t.close()
} else {
// active
exist = true
}
return false
}
return true
})
if exist {
return t, nil return t, nil
} }
// create tunnel if not exist // create tunnel if not exist
t = &P2PTunnel{pn: pn,
config: config,
id: tid,
}
pn.msgMapMtx.Lock() pn.msgMapMtx.Lock()
pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan []byte, 50) pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan []byte, 50)
pn.msgMapMtx.Unlock() pn.msgMapMtx.Unlock()
// server side // server side
if !isClient { if !isClient {
err := pn.newTunnel(t, tid, isClient) t, err = pn.newTunnel(config, tid, isClient)
return t, err // always return return t, err // always return
} }
// client side // client side
// peer info // peer info
initErr := t.requestPeerInfo() initErr := pn.requestPeerInfo(&config)
if initErr != nil { if initErr != nil {
gLog.Println(LvERROR, "init error:", initErr) gLog.Println(LvERROR, "init error:", initErr)
return nil, initErr return nil, initErr
} }
var err error
// try TCP6 // try TCP6
if IsIPv6(t.config.peerIPv6) && IsIPv6(t.pn.config.publicIPv6) { if IsIPv6(config.peerIPv6) && IsIPv6(pn.config.publicIPv6) {
gLog.Println(LvINFO, "try TCP6") gLog.Println(LvINFO, "try TCP6")
t.config.linkMode = LinkModeTCP6 config.linkMode = LinkModeTCP6
t.config.isUnderlayServer = 0 config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil { if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil return t, nil
} }
} }
@@ -388,59 +377,74 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
// TODO: try UDP6 // TODO: try UDP6
// try TCP4 // try TCP4
if t.config.hasIPv4 == 1 || t.pn.config.hasIPv4 == 1 || t.config.hasUPNPorNATPMP == 1 || t.pn.config.hasUPNPorNATPMP == 1 { if config.hasIPv4 == 1 || pn.config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 || pn.config.hasUPNPorNATPMP == 1 {
gLog.Println(LvINFO, "try TCP4") gLog.Println(LvINFO, "try TCP4")
t.config.linkMode = LinkModeTCP4 config.linkMode = LinkModeTCP4
if t.config.hasIPv4 == 1 || t.config.hasUPNPorNATPMP == 1 { if config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 {
t.config.isUnderlayServer = 0 config.isUnderlayServer = 0
} else { } else {
t.config.isUnderlayServer = 1 config.isUnderlayServer = 1
} }
if err = pn.newTunnel(t, tid, isClient); err == nil { if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil return t, nil
} }
} }
// TODO: try UDP4 // TODO: try UDP4
// try TCPPunch // try TCPPunch
if t.config.peerNatType == NATCone && t.pn.config.natType == NATCone { // TODO: support c2s for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries
if config.peerNatType == NATCone && pn.config.natType == NATCone { // TODO: support c2s
gLog.Println(LvINFO, "try TCP4 Punch") gLog.Println(LvINFO, "try TCP4 Punch")
t.config.linkMode = LinkModeTCPPunch config.linkMode = LinkModeTCPPunch
t.config.isUnderlayServer = 0 config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil { if t, err = pn.newTunnel(config, tid, isClient); err == nil {
gLog.Println(LvINFO, "TCP4 Punch ok") gLog.Println(LvINFO, "TCP4 Punch ok")
return t, nil return t, nil
} }
} }
}
// try UDPPunch // try UDPPunch
if t.config.peerNatType == NATCone || t.pn.config.natType == NATCone { for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries
if config.peerNatType == NATCone || pn.config.natType == NATCone {
gLog.Println(LvINFO, "try UDP4 Punch") gLog.Println(LvINFO, "try UDP4 Punch")
t.config.linkMode = LinkModeUDPPunch config.linkMode = LinkModeUDPPunch
t.config.isUnderlayServer = 0 config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil { if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil return t, nil
} }
} }
if !(config.peerNatType == NATCone && pn.config.natType == NATCone) { // not cone2cone, no more try
break
}
}
return nil, ErrorHandshake // only ErrorHandshake will try relay return nil, ErrorHandshake // only ErrorHandshake will try relay
} }
func (pn *P2PNetwork) newTunnel(t *P2PTunnel, tid uint64, isClient bool) error { func (pn *P2PNetwork) newTunnel(config AppConfig, tid uint64, isClient bool) (t *P2PTunnel, err error) {
if existTunnel := pn.findTunnel(&config); existTunnel != nil {
return existTunnel, nil
}
t = &P2PTunnel{pn: pn,
config: config,
id: tid,
}
t.initPort() t.initPort()
if isClient { if isClient {
if err := t.connect(); err != nil { if err = t.connect(); err != nil {
gLog.Println(LvERROR, "p2pTunnel connect error:", err) gLog.Println(LvERROR, "p2pTunnel connect error:", err)
return err return
} }
} else { } else {
if err := t.listen(); err != nil { if err = t.listen(); err != nil {
gLog.Println(LvERROR, "p2pTunnel listen error:", err) gLog.Println(LvERROR, "p2pTunnel listen error:", err)
return err return
} }
} }
// store it when success // store it when success
gLog.Printf(LvDEBUG, "store tunnel %d", tid) gLog.Printf(LvDEBUG, "store tunnel %d", tid)
pn.allTunnels.Store(tid, t) pn.allTunnels.Store(tid, t)
return nil return
} }
func (pn *P2PNetwork) init() error { func (pn *P2PNetwork) init() error {
gLog.Println(LvINFO, "init start") gLog.Println(LvINFO, "init start")
@@ -471,7 +475,12 @@ func (pn *P2PNetwork) init() error {
gLog.Println(LvDEBUG, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP) gLog.Println(LvDEBUG, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP)
gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort) gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort)
uri := "/api/v1/login" uri := "/api/v1/login"
config := tls.Config{InsecureSkipVerify: true} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(rootCA))
config := tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: false} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert
websocket.DefaultDialer.TLSClientConfig = &config websocket.DefaultDialer.TLSClientConfig = &config
u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri} u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri}
q := u.Query() q := u.Query()
@@ -543,17 +552,14 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
case MsgLogin: case MsgLogin:
// gLog.Println(LevelINFO,string(msg)) // gLog.Println(LevelINFO,string(msg))
rsp := LoginRsp{} rsp := LoginRsp{}
err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp) if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil {
if err != nil { gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(rsp), err)
gLog.Printf(LvERROR, "wrong login response:%s", err)
return return
} }
if rsp.Error != 0 { if rsp.Error != 0 {
gLog.Printf(LvERROR, "login error:%d, detail:%s", rsp.Error, rsp.Detail) gLog.Printf(LvERROR, "login error:%d, detail:%s", rsp.Error, rsp.Detail)
pn.running = false pn.running = false
} else { } else {
pn.serverTs = rsp.Ts
pn.hbTime = time.Now()
pn.config.Token = rsp.Token pn.config.Token = rsp.Token
pn.config.User = rsp.User pn.config.User = rsp.User
gConf.setToken(rsp.Token) gConf.setToken(rsp.Token)
@@ -562,12 +568,28 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
gConf.setNode(rsp.Node) gConf.setNode(rsp.Node)
pn.config.Node = rsp.Node pn.config.Node = rsp.Node
} }
pn.localTs = time.Now().Unix() gLog.Printf(LvINFO, "login ok. user=%s,node=%s", rsp.User, rsp.Node)
gLog.Printf(LvINFO, "login ok. user=%s,node=%s,Server ts=%d, local ts=%d", rsp.User, rsp.Node, rsp.Ts, pn.localTs)
} }
case MsgHeartbeat: case MsgHeartbeat:
gLog.Printf(LvDEBUG, "P2PNetwork heartbeat ok") gLog.Printf(LvDEBUG, "P2PNetwork heartbeat ok")
pn.hbTime = time.Now() pn.hbTime = time.Now()
rtt := pn.hbTime.UnixNano() - pn.t1
t2 := int64(binary.LittleEndian.Uint64(msg[openP2PHeaderSize : openP2PHeaderSize+8]))
dt := pn.t1 + rtt/2 - t2
if pn.dtma == 0 {
pn.dtma = dt
} else {
ddt := dt - pn.dt
// if pn.ddt == 0 {
pn.ddt = ddt
// } else {
// pn.ddt = pn.ddt/ddtma*(ddtma-1) + ddt/ddtma // avoid int64 overflow
// }
pn.dtma = pn.dtma/dtma*(dtma-1) + dt/dtma // avoid int64 overflow
}
pn.dt = dt
gLog.Printf(LvDEBUG, "server time dt=%dms ddt=%dns rtt=%dms", pn.dt/int64(time.Millisecond), pn.ddt, rtt/int64(time.Millisecond))
case MsgPush: case MsgPush:
handlePush(pn, head.SubType, msg) handlePush(pn, head.SubType, msg)
default: default:
@@ -723,3 +745,30 @@ func (pn *P2PNetwork) refreshIPv6(force bool) {
} }
pn.config.publicIPv6 = string(buf[:n]) pn.config.publicIPv6 = string(buf[:n])
} }
func (pn *P2PNetwork) requestPeerInfo(config *AppConfig) error {
// request peer info
pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{config.peerToken, config.PeerNode})
head, body := pn.read("", MsgQuery, MsgQueryPeerInfoRsp, UnderlayConnectTimeout)
if head == nil {
return ErrNetwork // network error, should not be ErrPeerOffline
}
rsp := QueryPeerInfoRsp{}
if err := json.Unmarshal(body, &rsp); err != nil {
return ErrMsgFormat
}
if rsp.Online == 0 {
return ErrPeerOffline
}
if compareVersion(rsp.Version, LeastSupportVersion) == LESS {
return ErrVersionNotCompatible
}
config.peerVersion = rsp.Version
config.hasIPv4 = rsp.HasIPv4
config.peerIP = rsp.IPv4
config.peerIPv6 = rsp.IPv6
config.hasUPNPorNATPMP = rsp.HasUPNPorNATPMP
config.peerNatType = rsp.NatType
///
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
"reflect"
"sync" "sync"
"time" "time"
) )
@@ -29,36 +30,9 @@ type P2PTunnel struct {
coneLocalPort int coneLocalPort int
coneNatPort int coneNatPort int
linkModeWeb string // use config.linkmode linkModeWeb string // use config.linkmode
punchTs uint64
} }
func (t *P2PTunnel) requestPeerInfo() error {
// request peer info
t.pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{t.config.peerToken, t.config.PeerNode})
head, body := t.pn.read("", MsgQuery, MsgQueryPeerInfoRsp, time.Second*10)
if head == nil {
return ErrNetwork // network error, should not be ErrPeerOffline
}
rsp := QueryPeerInfoRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong QueryPeerInfoRsp:%s", err)
return ErrMsgFormat
}
if rsp.Online == 0 {
return ErrPeerOffline
}
if compareVersion(rsp.Version, LeastSupportVersion) == LESS {
return ErrVersionNotCompatible
}
t.config.peerVersion = rsp.Version
t.config.hasIPv4 = rsp.HasIPv4
t.config.peerIP = rsp.IPv4
t.config.peerIPv6 = rsp.IPv6
t.config.hasUPNPorNATPMP = rsp.HasUPNPorNATPMP
t.config.peerNatType = rsp.NatType
///
return nil
}
func (t *P2PTunnel) initPort() { func (t *P2PTunnel) initPort() {
t.running = true t.running = true
t.hbMtx.Lock() t.hbMtx.Lock()
@@ -74,15 +48,15 @@ func (t *P2PTunnel) initPort() {
t.coneNatPort = t.pn.config.TCPPort // symmetric doesn't need coneNatPort t.coneNatPort = t.pn.config.TCPPort // symmetric doesn't need coneNatPort
} }
if t.config.linkMode == LinkModeUDPPunch { if t.config.linkMode == LinkModeUDPPunch {
// prepare one random cone hole // prepare one random cone hole manually
_, natPort, _ := natTest(t.pn.config.ServerHost, t.pn.config.UDPPort1, localPort) _, natPort, _ := natTest(t.pn.config.ServerHost, t.pn.config.UDPPort1, localPort)
t.coneLocalPort = localPort t.coneLocalPort = localPort
t.coneNatPort = natPort t.coneNatPort = natPort
} }
if t.config.linkMode == LinkModeTCPPunch { if t.config.linkMode == LinkModeTCPPunch {
// prepare one random cone hole // prepare one random cone hole by system automatically
_, natPort := natTCP(t.pn.config.ServerHost, IfconfigPort1, localPort) _, natPort, localPort2 := natTCP(t.pn.config.ServerHost, IfconfigPort1)
t.coneLocalPort = localPort t.coneLocalPort = localPort2
t.coneNatPort = natPort t.coneNatPort = natPort
} }
t.la = &net.UDPAddr{IP: net.ParseIP(t.pn.config.localIP), Port: t.coneLocalPort} t.la = &net.UDPAddr{IP: net.ParseIP(t.pn.config.localIP), Port: t.coneLocalPort}
@@ -112,14 +86,13 @@ func (t *P2PTunnel) connect() error {
req.Token = t.pn.config.Token req.Token = t.pn.config.Token
} }
t.pn.push(t.config.PeerNode, MsgPushConnectReq, req) t.pn.push(t.config.PeerNode, MsgPushConnectReq, req)
head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, time.Second*10) head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, ClientAPITimeout)
if head == nil { if head == nil {
return errors.New("connect error") return errors.New("connect error")
} }
rsp := PushConnectRsp{} rsp := PushConnectRsp{}
err := json.Unmarshal(body, &rsp) if err := json.Unmarshal(body, &rsp); err != nil {
if err != nil { gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(rsp), err)
gLog.Printf(LvERROR, "wrong MsgPushConnectRsp:%s", err)
return err return err
} }
// gLog.Println(LevelINFO, rsp) // gLog.Println(LevelINFO, rsp)
@@ -133,7 +106,8 @@ func (t *P2PTunnel) connect() error {
t.config.peerVersion = rsp.Version t.config.peerVersion = rsp.Version
t.config.peerConeNatPort = rsp.ConeNatPort t.config.peerConeNatPort = rsp.ConeNatPort
t.config.peerIP = rsp.FromIP t.config.peerIP = rsp.FromIP
err = t.start() t.punchTs = rsp.PunchTs
err := t.start()
if err != nil { if err != nil {
gLog.Println(LvERROR, "handshake error:", err) gLog.Println(LvERROR, "handshake error:", err)
err = ErrorHandshake err = ErrorHandshake
@@ -163,14 +137,10 @@ func (t *P2PTunnel) isActive() bool {
} }
func (t *P2PTunnel) checkActive() bool { func (t *P2PTunnel) checkActive() bool {
hbt := time.Now() if t.conn == nil {
t.hbMtx.Lock()
if t.hbTime.Before(time.Now().Add(-TunnelHeartbeatTime)) {
t.hbMtx.Unlock()
return false return false
} }
t.hbMtx.Unlock() hbt := time.Now()
// hbtime within TunnelHeartbeatTime, check it now
t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeat, nil) t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeat, nil)
isActive := false isActive := false
// wait at most 5s // wait at most 5s
@@ -182,6 +152,7 @@ func (t *P2PTunnel) checkActive() bool {
t.hbMtx.Unlock() t.hbMtx.Unlock()
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
} }
gLog.Printf(LvINFO, "checkActive %t. hbtime=%d", isActive, t.hbTime)
return isActive return isActive
} }
@@ -213,6 +184,13 @@ func (t *P2PTunnel) handshake() error {
return err return err
} }
} }
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else {
ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts)
}
gLog.Println(LvDEBUG, "handshake to ", t.config.PeerNode) gLog.Println(LvDEBUG, "handshake to ", t.config.PeerNode)
var err error var err error
// TODO: handle NATNone, nodes with public ip has no punching // TODO: handle NATNone, nodes with public ip has no punching
@@ -241,7 +219,7 @@ func (t *P2PTunnel) connectUnderlay() (err error) {
case LinkModeTCP6: case LinkModeTCP6:
t.conn, err = t.connectUnderlayTCP6() t.conn, err = t.connectUnderlayTCP6()
case LinkModeTCP4: case LinkModeTCP4:
t.conn, err = t.connectUnderlayTCP() t.conn, err = t.connectUnderlayTCP() // TODO: can not listen the same tcp port in pararell
case LinkModeTCPPunch: case LinkModeTCPPunch:
t.conn, err = t.connectUnderlayTCP() t.conn, err = t.connectUnderlayTCP()
case LinkModeUDPPunch: case LinkModeUDPPunch:
@@ -298,7 +276,7 @@ func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) {
return nil, fmt.Errorf("quic listen error:%s", e) return nil, fmt.Errorf("quic listen error:%s", e)
} }
} }
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5) t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
gLog.Println(LvDEBUG, "quic dial to ", t.ra.String()) gLog.Println(LvDEBUG, "quic dial to ", t.ra.String())
qConn, e = dialQuic(conn, t.ra, TunnelIdleTimeout) qConn, e = dialQuic(conn, t.ra, TunnelIdleTimeout)
if e != nil { if e != nil {
@@ -327,8 +305,7 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) {
defer gLog.Println(LvINFO, "connectUnderlayTCP end") defer gLog.Println(LvINFO, "connectUnderlayTCP end")
var qConn *underlayTCP var qConn *underlayTCP
if t.config.isUnderlayServer == 1 { if t.config.isUnderlayServer == 1 {
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) qConn, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode, t)
qConn, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen TCP error:%s", err) return nil, fmt.Errorf("listen TCP error:%s", err)
} }
@@ -345,9 +322,20 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) {
return qConn, nil return qConn, nil
} }
//else // client side
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5) if t.config.linkMode == LinkModeTCP4 {
gLog.Println(LvDEBUG, "TCP dial to ", t.config.peerIP, ":", t.config.peerConeNatPort) t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
} else { //tcp punch should sleep for punch the same time
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else {
ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts)
}
}
gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", t.coneLocalPort), "-->", fmt.Sprintf("%s:%d", t.config.peerIP, t.config.peerConeNatPort))
qConn, err = dialTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode) qConn, err = dialTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode)
if err != nil { if err != nil {
return nil, fmt.Errorf("TCP dial to %s:%d error:%s", t.config.peerIP, t.config.peerConeNatPort, err) return nil, fmt.Errorf("TCP dial to %s:%d error:%s", t.config.peerIP, t.config.peerConeNatPort, err)
@@ -374,7 +362,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) {
var qConn *underlayTCP6 var qConn *underlayTCP6
if t.config.isUnderlayServer == 1 { if t.config.isUnderlayServer == 1 {
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil)
qConn, err = listenTCP6(t.coneNatPort, TunnelIdleTimeout) qConn, err = listenTCP6(t.coneNatPort, HandshakeTimeout)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen TCP6 error:%s", err) return nil, fmt.Errorf("listen TCP6 error:%s", err)
} }
@@ -392,7 +380,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) {
} }
//else //else
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5) t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
gLog.Println(LvDEBUG, "TCP6 dial to ", t.config.peerIPv6) gLog.Println(LvDEBUG, "TCP6 dial to ", t.config.peerIPv6)
qConn, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort) qConn, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort)
if err != nil { if err != nil {
@@ -432,6 +420,7 @@ func (t *P2PTunnel) readLoop() {
} }
switch head.SubType { switch head.SubType {
case MsgTunnelHeartbeat: case MsgTunnelHeartbeat:
t.hbTime = time.Now()
t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeatAck, nil) t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeatAck, nil)
gLog.Printf(LvDEBUG, "%d read tunnel heartbeat", t.id) gLog.Printf(LvDEBUG, "%d read tunnel heartbeat", t.id)
case MsgTunnelHeartbeatAck: case MsgTunnelHeartbeatAck:
@@ -473,9 +462,8 @@ func (t *P2PTunnel) readLoop() {
t.pn.relay(tunnelID, body[8:]) t.pn.relay(tunnelID, body[8:])
case MsgRelayHeartbeat: case MsgRelayHeartbeat:
req := RelayHeartbeat{} req := RelayHeartbeat{}
err := json.Unmarshal(body, &req) if err := json.Unmarshal(body, &req); err != nil {
if err != nil { gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
gLog.Printf(LvERROR, "wrong RelayHeartbeat:%s", err)
continue continue
} }
gLog.Printf(LvDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID) gLog.Printf(LvDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID)
@@ -495,9 +483,8 @@ func (t *P2PTunnel) readLoop() {
t.pn.updateAppHeartbeat(req.AppID) t.pn.updateAppHeartbeat(req.AppID)
case MsgOverlayConnectReq: case MsgOverlayConnectReq:
req := OverlayConnectReq{} req := OverlayConnectReq{}
err := json.Unmarshal(body, &req) if err := json.Unmarshal(body, &req); err != nil {
if err != nil { gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
gLog.Printf(LvERROR, "wrong MsgOverlayConnectReq:%s", err)
continue continue
} }
// app connect only accept token(not relay totp token), avoid someone using the share relay node's token // app connect only accept token(not relay totp token), avoid someone using the share relay node's token
@@ -507,7 +494,7 @@ func (t *P2PTunnel) readLoop() {
} }
overlayID := req.ID overlayID := req.ID
gLog.Printf(LvDEBUG, "App:%d overlayID:%d connect %+v", req.AppID, overlayID, req) gLog.Printf(LvDEBUG, "App:%d overlayID:%d connect %s:%d", req.AppID, overlayID, req.DstIP, req.DstPort)
oConn := overlayConn{ oConn := overlayConn{
tunnel: t, tunnel: t,
id: overlayID, id: overlayID,
@@ -520,7 +507,8 @@ func (t *P2PTunnel) readLoop() {
if req.Protocol == "udp" { if req.Protocol == "udp" {
oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort}) oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort})
} else { } else {
oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5) oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), HandshakeTimeout)
} }
if err != nil { if err != nil {
gLog.Println(LvERROR, err) gLog.Println(LvERROR, err)
@@ -539,9 +527,8 @@ func (t *P2PTunnel) readLoop() {
go oConn.run() go oConn.run()
case MsgOverlayDisconnectReq: case MsgOverlayDisconnectReq:
req := OverlayDisconnectReq{} req := OverlayDisconnectReq{}
err := json.Unmarshal(body, &req) if err := json.Unmarshal(body, &req); err != nil {
if err != nil { gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
gLog.Printf(LvERROR, "wrong OverlayDisconnectRequest:%s", err)
continue continue
} }
overlayID := req.ID overlayID := req.ID
@@ -593,8 +580,10 @@ func (t *P2PTunnel) listen() error {
FromIP: t.pn.config.publicIP, FromIP: t.pn.config.publicIP,
ConeNatPort: t.coneNatPort, ConeNatPort: t.coneNatPort,
ID: t.id, ID: t.id,
PunchTs: uint64(time.Now().UnixNano() + int64(PunchTsDelay) - t.pn.dt),
Version: OpenP2PVersion, Version: OpenP2PVersion,
} }
t.punchTs = rsp.PunchTs
// only private node set ipv6 // only private node set ipv6
if t.config.fromToken == t.pn.config.Token { if t.config.fromToken == t.pn.config.Token {
t.pn.refreshIPv6(false) t.pn.refreshIPv6(false)

View File

@@ -10,9 +10,10 @@ import (
"time" "time"
) )
const OpenP2PVersion = "3.8.0" const OpenP2PVersion = "3.10.3"
const ProductName string = "openp2p" const ProductName string = "openp2p"
const LeastSupportVersion = "3.0.0" const LeastSupportVersion = "3.0.0"
const SyncServerTimeVersion = "3.9.0"
const ( const (
IfconfigPort1 = 27180 IfconfigPort1 = 27180
@@ -134,23 +135,26 @@ const (
const ( const (
ReadBuffLen = 4096 // for UDP maybe not enough ReadBuffLen = 4096 // for UDP maybe not enough
NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow
TunnelHeartbeatTime = time.Second * 15 TunnelHeartbeatTime = time.Second * 10 // some nat udp session expired time less than 15s. change to 10s
TunnelIdleTimeout = time.Minute TunnelIdleTimeout = time.Minute
SymmetricHandshakeNum = 800 // 0.992379 SymmetricHandshakeNum = 800 // 0.992379
// SymmetricHandshakeNum = 1000 // 0.999510 // SymmetricHandshakeNum = 1000 // 0.999510
SymmetricHandshakeInterval = time.Millisecond SymmetricHandshakeInterval = time.Millisecond
SymmetricHandshakeAckTimeout = time.Second * 5 HandshakeTimeout = time.Second * 5
PeerAddRelayTimeount = time.Second * 20 PeerAddRelayTimeount = time.Second * 30 // peer need times
CheckActiveTimeout = time.Second * 5 CheckActiveTimeout = time.Second * 5
PaddingSize = 16 PaddingSize = 16
AESKeySize = 16 AESKeySize = 16
MaxRetry = 10 MaxRetry = 10
Cone2ConePunchMaxRetry = 1
RetryInterval = time.Second * 30 RetryInterval = time.Second * 30
PublicIPEchoTimeout = time.Second * 1 PublicIPEchoTimeout = time.Second * 1
NatTestTimeout = time.Second * 5 NatTestTimeout = time.Second * 5
UDPReadTimeout = time.Second * 5 UDPReadTimeout = time.Second * 5
ClientAPITimeout = time.Second * 10 ClientAPITimeout = time.Second * 10
UnderlayConnectTimeout = time.Second * 10
MaxDirectTry = 3 MaxDirectTry = 3
PunchTsDelay = time.Second * 2
) )
// NATNone has public ip // NATNone has public ip
@@ -240,6 +244,7 @@ type PushConnectRsp struct {
ConeNatPort int `json:"coneNatPort,omitempty"` //it's not only cone, but also upnp or nat-pmp hole ConeNatPort int `json:"coneNatPort,omitempty"` //it's not only cone, but also upnp or nat-pmp hole
FromIP string `json:"fromIP,omitempty"` FromIP string `json:"fromIP,omitempty"`
ID uint64 `json:"id,omitempty"` ID uint64 `json:"id,omitempty"`
PunchTs uint64 `json:"punchts,omitempty"` // server timestamp
Version string `json:"version,omitempty"` Version string `json:"version,omitempty"`
} }
type PushRsp struct { type PushRsp struct {
@@ -346,9 +351,10 @@ type AppInfo struct {
AppName string `json:"appName,omitempty"` AppName string `json:"appName,omitempty"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
Protocol string `json:"protocol,omitempty"` Protocol string `json:"protocol,omitempty"`
Whitelist string `json:"whitelist,omitempty"`
SrcPort int `json:"srcPort,omitempty"` SrcPort int `json:"srcPort,omitempty"`
Protocol0 string `json:"protocol0,omitempty"` Protocol0 string `json:"protocol0,omitempty"`
SrcPort0 int `json:"srcPort0,omitempty"` SrcPort0 int `json:"srcPort0,omitempty"` // srcport+protocol is uneque, use as old app id
NatType int `json:"natType,omitempty"` NatType int `json:"natType,omitempty"`
PeerNode string `json:"peerNode,omitempty"` PeerNode string `json:"peerNode,omitempty"`
DstPort int `json:"dstPort,omitempty"` DstPort int `json:"dstPort,omitempty"`
@@ -435,3 +441,25 @@ type QueryPeerInfoRsp struct {
IPv6 string `json:"IPv6,omitempty"` // if public relay node, ipv6 not set IPv6 string `json:"IPv6,omitempty"` // if public relay node, ipv6 not set
HasUPNPorNATPMP int `json:"hasUPNPorNATPMP,omitempty"` HasUPNPorNATPMP int `json:"hasUPNPorNATPMP,omitempty"`
} }
const rootCA = `-----BEGIN CERTIFICATE-----
MIIDhTCCAm0CFHm0cd8dnGCbUW/OcS56jf0gvRk7MA0GCSqGSIb3DQEBCwUAMH4x
CzAJBgNVBAYTAkNOMQswCQYDVQQIDAJHRDETMBEGA1UECgwKb3BlbnAycC5jbjET
MBEGA1UECwwKb3BlbnAycC5jbjETMBEGA1UEAwwKb3BlbnAycC5jbjEjMCEGCSqG
SIb3DQEJARYUb3BlbnAycC5jbkBnbWFpbC5jb20wIBcNMjMwODAxMDkwMjMwWhgP
MjEyMzA3MDgwOTAyMzBaMH4xCzAJBgNVBAYTAkNOMQswCQYDVQQIDAJHRDETMBEG
A1UECgwKb3BlbnAycC5jbjETMBEGA1UECwwKb3BlbnAycC5jbjETMBEGA1UEAwwK
b3BlbnAycC5jbjEjMCEGCSqGSIb3DQEJARYUb3BlbnAycC5jbkBnbWFpbC5jb20w
ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDWg8wPy5hBLUaY4WOXayKu
+magEz1LAY0krzXYSZaSCvGMwA0cervwAqgKfiiZEhho5UNA5iVOJ6bO1RL9H7Vp
4HuW9BttDU/NQHguD8pyqx06Kaosz5LRw8USz1BCWWFdmi8Mv4I0omtd7m6lbWnY
nrjQKLYPahPW481jUfJPqR6wUTnBuBMr2ZAGqmFR4Lhqs9B1P9GeBfDWNwVApJUC
VEhbElukRJxdUvWeJ5+HMENKQcHCTTgmQbmDLMobHXs3Xf7fT9qC76wOe9LFHI6L
dAww9gryQhxWauQl1NO8aGJTFu+3wgnKBdTMJmF/1iuZYXJOCR1solwqU1hCgBsj
AgMBAAEwDQYJKoZIhvcNAQELBQADggEBADp153YNVN8p6/3PLnXxHBDeDViAfeQd
VJmy8eH1LTq/xtUY71HGSpL7iIBNoQdDTHfsg3c6ZANBCxbO/7AhFAzPt1aK8eHy
XuEiW0Z6R8np1Khh3alCOfD15tKcjok//Wxisbz+YItlbDus/eWRbLGB3HGrzn4l
GB18jw+G7o4U3rGX8agHqVGQEd06gk1ZaprASpTGwSsv4A5ehosjT1d7re8Z5eD4
RVtXS+DplMClQ5QSlv3StwcWOsjyiAimNfLEU5xoEfq17yOJUTU1OTL4YOt16QUc
C1tnzFr3k/ioqFR7cnyzNrbjlfPOmO9l2WReEbMP3bvaSHm6EcpJKS8=
-----END CERTIFICATE-----`

View File

@@ -15,10 +15,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/lucas-clemente/quic-go" "github.com/quic-go/quic-go"
) )
//quic.DialContext do not support version 44,disable it // quic.DialContext do not support version 44,disable it
var quicVersion []quic.VersionNumber var quicVersion []quic.VersionNumber
type underlayQUIC struct { type underlayQUIC struct {
@@ -87,7 +87,7 @@ func (conn *underlayQUIC) CloseListener() {
} }
func (conn *underlayQUIC) Accept() error { func (conn *underlayQUIC) Accept() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), UnderlayConnectTimeout)
defer cancel() defer cancel()
sess, err := conn.listener.Accept(ctx) sess, err := conn.listener.Accept(ctx)
if err != nil { if err != nil {

View File

@@ -67,21 +67,30 @@ func (conn *underlayTCP) Close() error {
return conn.Conn.Close() return conn.Conn.Close()
} }
func listenTCP(host string, port int, localPort int, mode string) (*underlayTCP, error) { func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) (*underlayTCP, error) {
if mode == LinkModeTCPPunch { if mode == LinkModeTCPPunch {
c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout) if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else {
ts := time.Duration(int64(t.punchTs) + t.pn.dt - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts)
}
gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port))
c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
if err != nil { if err != nil {
gLog.Println(LvDEBUG, "send tcp punch: ", err) gLog.Println(LvDEBUG, "send tcp punch: ", err)
return nil, err return nil, err
} }
return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil
} }
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil)
addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", localPort)) addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", localPort))
l, err := net.ListenTCP("tcp", addr) l, err := net.ListenTCP("tcp", addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
l.SetDeadline(time.Now().Add(SymmetricHandshakeAckTimeout)) l.SetDeadline(time.Now().Add(HandshakeTimeout))
c, err := l.Accept() c, err := l.Accept()
defer l.Close() defer l.Close()
if err != nil { if err != nil {
@@ -94,9 +103,9 @@ func dialTCP(host string, port int, localPort int, mode string) (*underlayTCP, e
var c net.Conn var c net.Conn
var err error var err error
if mode == LinkModeTCPPunch { if mode == LinkModeTCPPunch {
c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout) c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
} else { } else {
c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout) c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
} }
if err != nil { if err != nil {

View File

@@ -73,7 +73,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) {
return nil, err return nil, err
} }
defer l.Close() defer l.Close()
l.SetDeadline(time.Now().Add(SymmetricHandshakeAckTimeout)) l.SetDeadline(time.Now().Add(HandshakeTimeout))
c, err := l.Accept() c, err := l.Accept()
defer l.Close() defer l.Close()
if err != nil { if err != nil {
@@ -83,7 +83,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) {
} }
func dialTCP6(host string, port int) (*underlayTCP6, error) { func dialTCP6(host string, port int) (*underlayTCP6, error) {
c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), SymmetricHandshakeAckTimeout) c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), HandshakeTimeout)
if err != nil { if err != nil {
gLog.Printf(LvERROR, "Dial %s:%d error:%s", host, port, err) gLog.Printf(LvERROR, "Dial %s:%d error:%s", host, port, err)
return nil, err return nil, err

View File

@@ -5,6 +5,7 @@ import (
"archive/zip" "archive/zip"
"compress/gzip" "compress/gzip"
"crypto/tls" "crypto/tls"
"crypto/x509"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@@ -19,15 +20,19 @@ import (
func update(host string, port int) { func update(host string, port int) {
gLog.Println(LvINFO, "update start") gLog.Println(LvINFO, "update start")
defer gLog.Println(LvINFO, "update end") defer gLog.Println(LvINFO, "update end")
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(rootCA))
c := http.Client{ c := http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, TLSClientConfig: &tls.Config{RootCAs: caCertPool,
InsecureSkipVerify: false},
}, },
Timeout: time.Second * 30, Timeout: time.Second * 30,
} }
goos := runtime.GOOS goos := runtime.GOOS
goarch := runtime.GOARCH goarch := runtime.GOARCH
rsp, err := c.Get(fmt.Sprintf("https://%s:%d/api/v1/update?fromver=%s&os=%s&arch=%s", host, port, OpenP2PVersion, goos, goarch)) rsp, err := c.Get(fmt.Sprintf("https://%s:%d/api/v1/update?fromver=%s&os=%s&arch=%s&user=%s&node=%s", host, port, OpenP2PVersion, goos, goarch, gConf.Network.User, gConf.Network.Node))
if err != nil { if err != nil {
gLog.Println(LvERROR, "update:query update list failed:", err) gLog.Println(LvERROR, "update:query update list failed:", err)
return return
@@ -43,8 +48,7 @@ func update(host string, port int) {
return return
} }
updateInfo := UpdateInfo{} updateInfo := UpdateInfo{}
err = json.Unmarshal(rspBuf, &updateInfo) if err = json.Unmarshal(rspBuf, &updateInfo); err != nil {
if err != nil {
gLog.Println(LvERROR, rspBuf, " update info decode error:", err) gLog.Println(LvERROR, rspBuf, " update info decode error:", err)
return return
} }
@@ -69,7 +73,7 @@ func updateFile(url string, checksum string, dst string) error {
return err return err
} }
tr := &http.Transport{ tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
} }
client := &http.Client{Transport: tr} client := &http.Client{Transport: tr}
response, err := client.Get(url) response, err := client.Get(url)

View File

@@ -5,6 +5,7 @@ package openp2p
import ( import (
"bytes" "bytes"
"crypto/tls"
"encoding/xml" "encoding/xml"
"errors" "errors"
"fmt" "fmt"
@@ -181,7 +182,12 @@ func localIPv4() string { // TODO: multi nic will wrong
} }
func getServiceURL(rootURL string) (url, urnDomain string, err error) { func getServiceURL(rootURL string) (url, urnDomain string, err error) {
r, err := http.Get(rootURL) client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Timeout: time.Second * 3}
r, err := client.Get(rootURL)
if err != nil { if err != nil {
return return
} }

12
docker/Dockerfile Executable file
View File

@@ -0,0 +1,12 @@
FROM alpine:3.18.2
# Replace the default Alpine repositories with Aliyun mirrors
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories && \
apk add --no-cache ca-certificates && \
rm -rf /tmp/* /var/tmp/* /var/cache/apk/* /var/cache/distfiles/*
COPY get-client.sh /
RUN echo $TARGETPLATFORM && chmod +x /get-client.sh && ./get-client.sh
ENTRYPOINT ["/openp2p"]

43
docker/get-client.sh Executable file
View File

@@ -0,0 +1,43 @@
#!/bin/sh
echo "Running on platform: $TARGETPLATFORM"
# TARGETPLATFORM=$(echo $TARGETPLATFORM | tr ',' '/')
echo "Running on platform: $TARGETPLATFORM"
sysType="linux-amd64"
archType=$(uname -m)
if [[ $archType == aarch64 ]] ;
then
sysType="linux-arm64"
elif [[ $archType == arm* ]] ;
then
sysType="linux-arm"
elif [[ $archType == i*86 ]] ;
then
sysType="linux-386"
elif [[ $archType == mips ]] ;
then
sysType="linux-mipsle"
ls /lib |grep mipsel
if [[ $? -ne 0 ]]; then
# mipsel not found, it's mipseb
sysType="linux-mipsbe"
fi
fi
url="https://openp2p.cn/download/v1/latest/openp2p-latest.$sysType.tar.gz"
echo "download $url start"
if [ -f /usr/bin/curl ]; then
curl -k -o openp2p.tar.gz $url
else
wget --no-check-certificate -O openp2p.tar.gz $url
fi
if [ $? -ne 0 ]; then
echo "download error $?"
exit 9
fi
echo "download ok"
tar -xzvf openp2p.tar.gz
chmod +x openp2p
pwd
ls -l
exit 0

27
go.mod
View File

@@ -3,28 +3,27 @@ module openp2p
go 1.18 go 1.18
require ( require (
github.com/emirpasic/gods v1.18.1
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/lucas-clemente/quic-go v0.27.0
github.com/openp2p-cn/go-reuseport v0.3.2 github.com/openp2p-cn/go-reuseport v0.3.2
github.com/openp2p-cn/service v1.0.0 github.com/openp2p-cn/service v1.0.0
github.com/openp2p-cn/totp v0.0.0-20230102121327-8e02f6b392ed github.com/openp2p-cn/totp v0.0.0-20230102121327-8e02f6b392ed
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f github.com/quic-go/quic-go v0.34.0
golang.org/x/sys v0.5.0
) )
require ( require (
github.com/cheekybits/genny v1.0.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/kardianos/service v1.2.2 // indirect github.com/kardianos/service v1.2.2 // indirect
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect github.com/onsi/ginkgo/v2 v2.2.0 // indirect
github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect github.com/quic-go/qtls-go1-19 v0.3.2 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.1 // indirect github.com/quic-go/qtls-go1-20 v0.2.2 // indirect
github.com/nxadm/tail v1.4.8 // indirect golang.org/x/crypto v0.4.0 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect golang.org/x/mod v0.6.0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.7.0 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect golang.org/x/tools v0.2.0 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/protobuf v1.28.1 // indirect google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
) )