Compare commits

...

10 Commits

Author SHA1 Message Date
TenderIronh
b72ede9a6a optimize hole punching 2023-09-04 23:11:42 +08:00
Ahackerl
b39fab2188 fix unused sync (#47) 2023-09-04 09:07:20 +08:00
W192547975
9b0525294a echo server in publicIPTest update (#39)
* Update nat.go
func publicIPTest echo server
2023-08-24 10:55:09 +08:00
W192547975
276a4433f1 Bitset logmode update (#40)
* Update log.go

bitset loglevel

---------

Co-authored-by: OpenP2P <89245779+TenderIronh@users.noreply.github.com>
2023-08-10 11:23:36 +08:00
TenderIronh
e21adebc26 3.10.3 2023-08-09 22:52:29 +08:00
OpenP2P
b2a7619bd6 Merge pull request #43 from W192547975/fixbug
Bug fixing from 2023/8/1
2023-08-08 16:50:25 +08:00
W192547975
fe4022ba6c Update openp2p.go 2023-08-06 23:38:14 +08:00
W192547975
82c74b4f85 Update config.go 2023-08-06 23:31:17 +08:00
TenderIronh
0af65b7204 tls verify server and support docker 2023-08-03 23:05:45 +08:00
TenderIronh
46b4f78010 optimize tcp and udp punch 2023-07-29 20:36:35 +08:00
22 changed files with 1089 additions and 529 deletions

View File

@@ -100,5 +100,8 @@ sudo /usr/local/openp2p/openp2p uninstall
## Docker运行
```
docker run -d --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME
# 把YOUR-TOKENYOUR-NODE-NAME替换成自己的
docker run -d --restart=always --net host --name openp2p-client -e OPENP2P_TOKEN=YOUR-TOKEN -e OPENP2P_NODE=YOUR-NODE-NAME openp2pcn/openp2p-client:latest
OR
docker run -d --restart=always --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME
```

View File

@@ -102,5 +102,8 @@ sudo /usr/local/openp2p/openp2p uninstall
## Run with Docker
```
# Replace YOUR-TOKEN and YOUR-NODE-NAME with yours
docker run -d --net host --name openp2p-client -e OPENP2P_TOKEN=YOUR-TOKEN -e OPENP2P_NODE=YOUR-NODE-NAME openp2pcn/openp2p-client:latest
OR
docker run -d --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME
```

View File

@@ -137,8 +137,7 @@ func netInfo() *NetInfo {
continue
}
rsp := NetInfo{}
err = json.Unmarshal(buf[:n], &rsp)
if err != nil {
if err = json.Unmarshal(buf[:n], &rsp); err != nil {
gLog.Printf(LvERROR, "wrong NetInfo:%s", err)
continue
}

View File

@@ -3,8 +3,10 @@ package openp2p
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"strconv"
"sync"
"time"
)
@@ -13,14 +15,16 @@ var gConf Config
type AppConfig struct {
// required
AppName string
Protocol string
SrcPort int
PeerNode string
DstPort int
DstHost string
PeerUser string
Enabled int // default:1
AppName string
Protocol string
Whitelist string
SrcPort int
PeerNode string
DstPort int
DstHost string
PeerUser string
RelayNode string
Enabled int // default:1
// runtime info
peerVersion string
peerToken uint64
@@ -41,6 +45,10 @@ type AppConfig struct {
isUnderlayServer int // TODO: bool?
}
func (c *AppConfig) ID() string {
return fmt.Sprintf("%s%d", c.Protocol, c.SrcPort)
}
// TODO: add loglevel, maxlogfilesize
type Config struct {
Network NetworkConfig `json:"network"`
@@ -120,7 +128,7 @@ func (c *Config) save() {
}
func init() {
gConf.LogLevel = 1
gConf.LogLevel = int(LvINFO)
gConf.Network.ShareBandwidth = 10
gConf.Network.ServerHost = "api.openp2p.cn"
gConf.Network.ServerPort = WsPort
@@ -169,6 +177,16 @@ func (c *Config) setShareBandwidth(bw int) {
defer c.save()
c.Network.ShareBandwidth = bw
}
func (c *Config) setIPv6(v6 string) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.Network.publicIPv6 = v6
}
func (c *Config) IPv6() string {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.Network.publicIPv6
}
type NetworkConfig struct {
// local info
@@ -201,16 +219,18 @@ func parseParams(subCommand string) {
node := fset.String("node", "", "node name. 8-31 characters. if not set, it will be hostname")
peerNode := fset.String("peernode", "", "peer node name that you want to connect")
dstIP := fset.String("dstip", "127.0.0.1", "destination ip ")
whiteList := fset.String("whitelist", "", "whitelist for p2pApp ")
dstPort := fset.Int("dstport", 0, "destination port ")
srcPort := fset.Int("srcport", 0, "source port ")
tcpPort := fset.Int("tcpport", 0, "tcp port for upnp or publicip")
protocol := fset.String("protocol", "tcp", "tcp or udp")
appName := fset.String("appname", "", "app name")
relayNode := fset.String("relaynode", "", "relaynode")
shareBandwidth := fset.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private network no limit")
daemonMode := fset.Bool("d", false, "daemonMode")
notVerbose := fset.Bool("nv", false, "not log console")
newconfig := fset.Bool("newconfig", false, "not load existing config.json")
logLevel := fset.Int("loglevel", 0, "0:info 1:warn 2:error 3:debug")
logLevel := fset.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error")
if subCommand == "" { // no subcommand
fset.Parse(os.Args[1:])
} else {
@@ -220,10 +240,12 @@ func parseParams(subCommand string) {
config := AppConfig{Enabled: 1}
config.PeerNode = *peerNode
config.DstHost = *dstIP
config.Whitelist = *whiteList
config.DstPort = *dstPort
config.SrcPort = *srcPort
config.Protocol = *protocol
config.AppName = *appName
config.RelayNode = *relayNode
if !*newconfig {
gConf.load() // load old config. otherwise will clear all apps
}
@@ -253,17 +275,17 @@ func parseParams(subCommand string) {
gConf.setToken(*token)
}
})
// set default value
if gConf.Network.ServerHost == "" {
gConf.Network.ServerHost = *serverHost
}
if *node != "" {
if len(*node) < MinNodeNameLen {
gLog.Println(LvERROR, ErrNodeTooShort)
os.Exit(9)
}
gConf.Network.Node = *node
} else {
envNode := os.Getenv("OPENP2P_NODE")
if envNode != "" {
gConf.Network.Node = envNode
}
if gConf.Network.Node == "" { // if node name not set. use os.Hostname
gConf.Network.Node = defaultNodeName()
}
@@ -275,7 +297,14 @@ func parseParams(subCommand string) {
}
gConf.Network.TCPPort = *tcpPort
}
if *token == 0 {
envToken := os.Getenv("OPENP2P_TOKEN")
if envToken != "" {
if n, err := strconv.ParseUint(envToken, 10, 64); n != 0 && err == nil {
gConf.setToken(n)
}
}
}
gConf.Network.ServerPort = *serverPort
gConf.Network.UDPPort1 = UDPPort1
gConf.Network.UDPPort2 = UDPPort2

View File

@@ -18,4 +18,5 @@ var (
ErrMsgFormat = errors.New("message format wrong")
ErrVersionNotCompatible = errors.New("version not compatible")
ErrOverlayConnDisconnect = errors.New("overlay connection is disconnected")
ErrConnectRelayNode = errors.New("connect relay node error")
)

View File

@@ -6,8 +6,8 @@ import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"reflect"
"time"
"github.com/openp2p-cn/totp"
@@ -22,62 +22,10 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead)
switch subType {
case MsgPushConnectReq: // TODO: handle a msg move to a new function
req := PushConnectReq{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushConnectReq:%s", err)
return err
}
gLog.Printf(LvDEBUG, "%s is connecting...", req.From)
gLog.Println(LvDEBUG, "push connect response to ", req.From)
if compareVersion(req.Version, LeastSupportVersion) == LESS {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
rsp := PushConnectRsp{
Error: 10,
Detail: ErrVersionNotCompatible.Error(),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
return ErrVersionNotCompatible
}
// verify totp token or token
t := totp.TOTP{Step: totp.RelayTOTPStep}
if t.Verify(req.Token, pn.config.Token, time.Now().Unix()-pn.dt) { // localTs may behind, auto adjust ts
gLog.Printf(LvINFO, "Access Granted\n")
config := AppConfig{}
config.peerNatType = req.NatType
config.peerConeNatPort = req.ConeNatPort
config.peerIP = req.FromIP
config.PeerNode = req.From
config.peerVersion = req.Version
config.fromToken = req.Token
config.peerIPv6 = req.IPv6
config.hasIPv4 = req.HasIPv4
config.hasUPNPorNATPMP = req.HasUPNPorNATPMP
config.linkMode = req.LinkMode
config.isUnderlayServer = req.IsUnderlayServer
// share relay node will limit bandwidth
if req.Token != pn.config.Token {
gLog.Printf(LvINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth)
config.shareBandwidth = pn.config.ShareBandwidth
}
// go pn.AddTunnel(config, req.ID)
go pn.addDirectTunnel(config, req.ID)
break
}
gLog.Println(LvERROR, "Access Denied:", req.From)
rsp := PushConnectRsp{
Error: 1,
Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
err = handleConnectReq(pn, subType, msg)
case MsgPushRsp:
rsp := PushRsp{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &rsp)
if err != nil {
if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil {
gLog.Printf(LvERROR, "wrong pushRsp:%s", err)
return err
}
@@ -88,9 +36,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}
case MsgPushAddRelayTunnelReq:
req := AddRelayTunnelReq{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
return err
}
config := AppConfig{}
@@ -102,28 +49,20 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
// notify peer relay ready
msg := TunnelMsg{ID: t.id}
pn.push(r.From, MsgPushAddRelayTunnelRsp, msg)
} else {
pn.push(r.From, MsgPushAddRelayTunnelRsp, "error") // compatible with old version client, trigger unmarshal error
}
}(req)
case MsgPushAPPKey:
req := APPKeySync{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong APPKeySync:%s", err)
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
return err
}
SaveKey(req.AppID, req.AppKey)
case MsgPushUpdate:
gLog.Println(LvINFO, "MsgPushUpdate")
update(pn.config.ServerHost, pn.config.ServerPort) // download new version first, then exec ./openp2p update
targetPath := filepath.Join(defaultInstallPath, defaultBinName)
args := []string{"update"}
env := os.Environ()
cmd := exec.Command(targetPath, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin
cmd.Env = env
err := cmd.Run()
err := update(pn.config.ServerHost, pn.config.ServerPort)
if err == nil {
os.Exit(0)
}
@@ -133,123 +72,16 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
os.Exit(0)
return err
case MsgPushReportApps:
gLog.Println(LvINFO, "MsgPushReportApps")
req := ReportApps{}
gConf.mtx.Lock()
defer gConf.mtx.Unlock()
for _, config := range gConf.Apps {
appActive := 0
relayNode := ""
relayMode := ""
linkMode := LinkModeUDPPunch
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
if ok {
app := i.(*p2pApp)
if app.isActive() {
appActive = 1
}
relayNode = app.relayNode
relayMode = app.relayMode
linkMode = app.tunnel.linkModeWeb
}
appInfo := AppInfo{
AppName: config.AppName,
Error: config.errMsg,
Protocol: config.Protocol,
SrcPort: config.SrcPort,
RelayNode: relayNode,
RelayMode: relayMode,
LinkMode: linkMode,
PeerNode: config.PeerNode,
DstHost: config.DstHost,
DstPort: config.DstPort,
PeerUser: config.PeerUser,
PeerIP: config.peerIP,
PeerNatType: config.peerNatType,
RetryTime: config.retryTime.Local().Format("2006-01-02T15:04:05-0700"),
ConnectTime: config.connectTime.Local().Format("2006-01-02T15:04:05-0700"),
IsActive: appActive,
Enabled: config.Enabled,
}
req.Apps = append(req.Apps, appInfo)
}
pn.write(MsgReport, MsgReportApps, &req)
err = handleReportApps(pn, subType, msg)
case MsgPushReportLog:
gLog.Println(LvDEBUG, "MsgPushReportLog")
req := ReportLogReq{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushReportLog:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
if req.FileName == "" {
req.FileName = "openp2p.log"
}
f, err := os.Open(filepath.Join("log", req.FileName))
if err != nil {
gLog.Println(LvERROR, "read log file error:", err)
break
}
fi, err := f.Stat()
if err != nil {
break
}
if req.Offset == 0 && fi.Size() > 4096 {
req.Offset = fi.Size() - 4096
}
if req.Len <= 0 {
req.Len = 4096
}
f.Seek(req.Offset, 0)
if req.Len > 1024*1024 { // too large
break
}
buff := make([]byte, req.Len)
readLength, err := f.Read(buff)
f.Close()
if err != nil {
gLog.Println(LvERROR, "read log content error:", err)
break
}
rsp := ReportLogRsp{}
rsp.Content = string(buff[:readLength])
rsp.FileName = req.FileName
rsp.Total = fi.Size()
rsp.Len = req.Len
pn.write(MsgReport, MsgPushReportLog, &rsp)
err = handleLog(pn, subType, msg)
case MsgPushEditApp:
gLog.Println(LvINFO, "MsgPushEditApp")
newApp := AppInfo{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &newApp)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushEditApp:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
oldConf := AppConfig{Enabled: 1}
// protocol0+srcPort0 exist, delApp
oldConf.AppName = newApp.AppName
oldConf.Protocol = newApp.Protocol0
oldConf.SrcPort = newApp.SrcPort0
oldConf.PeerNode = newApp.PeerNode
oldConf.DstHost = newApp.DstHost
oldConf.DstPort = newApp.DstPort
gConf.delete(oldConf)
// AddApp
newConf := oldConf
newConf.Protocol = newApp.Protocol
newConf.SrcPort = newApp.SrcPort
gConf.add(newConf, false)
pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
// autoReconnect will auto AddApp
// pn.AddApp(config)
// TODO: report result
err = handleEditApp(pn, subType, msg)
case MsgPushEditNode:
gLog.Println(LvINFO, "MsgPushEditNode")
req := EditNode{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushEditNode:%s %s", err, string(msg[openP2PHeaderSize:]))
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
gConf.setNode(req.NewName)
@@ -259,9 +91,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
case MsgPushSwitchApp:
gLog.Println(LvINFO, "MsgPushSwitchApp")
app := AppInfo{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &app)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushSwitchApp:%s %s", err, string(msg[openP2PHeaderSize:]))
if err = json.Unmarshal(msg[openP2PHeaderSize:], &app); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(app), err, string(msg[openP2PHeaderSize:]))
return err
}
config := AppConfig{Enabled: app.Enabled, SrcPort: app.SrcPort, Protocol: app.Protocol}
@@ -273,19 +104,195 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}
case MsgPushDstNodeOnline:
gLog.Println(LvINFO, "MsgPushDstNodeOnline")
app := PushDstNodeOnline{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &app)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushDstNodeOnline:%s %s", err, string(msg[openP2PHeaderSize:]))
req := PushDstNodeOnline{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
gLog.Println(LvINFO, "retry peerNode ", app.Node)
gConf.retryApp(app.Node)
gLog.Println(LvINFO, "retry peerNode ", req.Node)
gConf.retryApp(req.Node)
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[pushHead.From]
pn.msgMapMtx.Unlock()
ch <- msg
ch <- pushMsg{data: msg, ts: time.Now()}
}
return nil
return err
}
func handleEditApp(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvINFO, "MsgPushEditApp")
newApp := AppInfo{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &newApp); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(newApp), err, string(msg[openP2PHeaderSize:]))
return err
}
oldConf := AppConfig{Enabled: 1}
// protocol0+srcPort0 exist, delApp
oldConf.AppName = newApp.AppName
oldConf.Protocol = newApp.Protocol0
oldConf.Whitelist = newApp.Whitelist
oldConf.SrcPort = newApp.SrcPort0
oldConf.PeerNode = newApp.PeerNode
oldConf.DstHost = newApp.DstHost
oldConf.DstPort = newApp.DstPort
gConf.delete(oldConf)
// AddApp
newConf := oldConf
newConf.Protocol = newApp.Protocol
newConf.SrcPort = newApp.SrcPort
gConf.add(newConf, false)
pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
return nil
// autoReconnect will auto AddApp
// pn.AddApp(config)
// TODO: report result
}
func handleConnectReq(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
req := PushConnectReq{}
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
return err
}
gLog.Printf(LvDEBUG, "%s is connecting...", req.From)
gLog.Println(LvDEBUG, "push connect response to ", req.From)
if compareVersion(req.Version, LeastSupportVersion) == LESS {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
rsp := PushConnectRsp{
Error: 10,
Detail: ErrVersionNotCompatible.Error(),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
return ErrVersionNotCompatible
}
// verify totp token or token
t := totp.TOTP{Step: totp.RelayTOTPStep}
if t.Verify(req.Token, pn.config.Token, time.Now().Unix()-pn.dt/int64(time.Second)) { // localTs may behind, auto adjust ts
gLog.Printf(LvINFO, "Access Granted\n")
config := AppConfig{}
config.peerNatType = req.NatType
config.peerConeNatPort = req.ConeNatPort
config.peerIP = req.FromIP
config.PeerNode = req.From
config.peerVersion = req.Version
config.fromToken = req.Token
config.peerIPv6 = req.IPv6
config.hasIPv4 = req.HasIPv4
config.hasUPNPorNATPMP = req.HasUPNPorNATPMP
config.linkMode = req.LinkMode
config.isUnderlayServer = req.IsUnderlayServer
// share relay node will limit bandwidth
if req.Token != pn.config.Token {
gLog.Printf(LvINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth)
config.shareBandwidth = pn.config.ShareBandwidth
}
// go pn.AddTunnel(config, req.ID)
go pn.addDirectTunnel(config, req.ID)
return nil
}
gLog.Println(LvERROR, "Access Denied:", req.From)
rsp := PushConnectRsp{
Error: 1,
Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node),
To: req.From,
From: pn.config.Node,
}
return pn.push(req.From, MsgPushConnectRsp, rsp)
}
func handleReportApps(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvINFO, "MsgPushReportApps")
req := ReportApps{}
gConf.mtx.Lock()
defer gConf.mtx.Unlock()
for _, config := range gConf.Apps {
appActive := 0
relayNode := ""
relayMode := ""
linkMode := LinkModeUDPPunch
i, ok := pn.apps.Load(config.ID())
if ok {
app := i.(*p2pApp)
if app.isActive() {
appActive = 1
}
relayNode = app.relayNode
relayMode = app.relayMode
linkMode = app.tunnel.linkModeWeb
}
appInfo := AppInfo{
AppName: config.AppName,
Error: config.errMsg,
Protocol: config.Protocol,
Whitelist: config.Whitelist,
SrcPort: config.SrcPort,
RelayNode: relayNode,
RelayMode: relayMode,
LinkMode: linkMode,
PeerNode: config.PeerNode,
DstHost: config.DstHost,
DstPort: config.DstPort,
PeerUser: config.PeerUser,
PeerIP: config.peerIP,
PeerNatType: config.peerNatType,
RetryTime: config.retryTime.Local().Format("2006-01-02T15:04:05-0700"),
ConnectTime: config.connectTime.Local().Format("2006-01-02T15:04:05-0700"),
IsActive: appActive,
Enabled: config.Enabled,
}
req.Apps = append(req.Apps, appInfo)
}
return pn.write(MsgReport, MsgReportApps, &req)
}
func handleLog(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvDEBUG, "MsgPushReportLog")
const defaultLen = 1024 * 128
const maxLen = 1024 * 1024
req := ReportLogReq{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
if req.FileName == "" {
req.FileName = "openp2p.log"
}
f, err := os.Open(filepath.Join("log", req.FileName))
if err != nil {
gLog.Println(LvERROR, "read log file error:", err)
return err
}
fi, err := f.Stat()
if err != nil {
return err
}
if req.Offset > fi.Size() {
req.Offset = fi.Size() - defaultLen
}
// verify input parameters
if req.Offset < 0 {
req.Offset = 0
}
if req.Len <= 0 || req.Len > maxLen {
req.Len = defaultLen
}
f.Seek(req.Offset, 0)
buff := make([]byte, req.Len)
readLength, err := f.Read(buff)
f.Close()
if err != nil {
gLog.Println(LvERROR, "read log content error:", err)
return err
}
rsp := ReportLogRsp{}
rsp.Content = string(buff[:readLength])
rsp.FileName = req.FileName
rsp.Total = fi.Size()
rsp.Len = req.Len
return pn.write(MsgReport, MsgPushReportLog, &rsp)
}

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"math/rand"
"net"
"sync"
"time"
)
@@ -34,7 +33,6 @@ func handshakeC2C(t *P2PTunnel) (err error) {
}
}
t.ra, _ = net.ResolveUDPAddr("udp", ra.String())
// cone server side
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
gLog.Printf(LvDEBUG, "read %d handshake ", t.id)
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
@@ -43,13 +41,7 @@ func handshakeC2C(t *P2PTunnel) (err error) {
gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err)
return err
}
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
gLog.Printf(LvDEBUG, "read %d handshake ack ", t.id)
gLog.Printf(LvINFO, "handshakeC2C ok")
return nil
}
}
// cone client side will only read handshake ack
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
gLog.Printf(LvDEBUG, "read %d handshake ack ", t.id)
_, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
@@ -65,8 +57,6 @@ func handshakeC2C(t *P2PTunnel) (err error) {
func handshakeC2S(t *P2PTunnel) error {
gLog.Printf(LvDEBUG, "handshakeC2S start")
defer gLog.Printf(LvDEBUG, "handshakeC2S end")
// even if read timeout, continue handshake
t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, HandshakeTimeout)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randPorts := r.Perm(65532)
conn, err := net.ListenUDP("udp", t.la)
@@ -74,11 +64,12 @@ func handshakeC2S(t *P2PTunnel) error {
return err
}
defer conn.Close()
go func() error {
gLog.Printf(LvDEBUG, "send symmetric handshake to %s from %d:%d start", t.config.peerIP, t.coneLocalPort, t.coneNatPort)
for i := 0; i < SymmetricHandshakeNum; i++ {
// TODO: auto calc cost time
time.Sleep(SymmetricHandshakeInterval)
// time.Sleep(SymmetricHandshakeInterval)
dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, randPorts[i]+2))
if err != nil {
return err
@@ -92,8 +83,7 @@ func handshakeC2S(t *P2PTunnel) error {
gLog.Println(LvDEBUG, "send symmetric handshake end")
return nil
}()
deadline := time.Now().Add(HandshakeTimeout)
err = conn.SetReadDeadline(deadline)
err = conn.SetReadDeadline(time.Now().Add(HandshakeTimeout))
if err != nil {
gLog.Println(LvERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error")
return err
@@ -112,10 +102,27 @@ func handshakeC2S(t *P2PTunnel) error {
return err
}
t.ra, _ = net.ResolveUDPAddr("udp", dst.String())
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ", t.id)
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
for {
_, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "handshakeC2S handshake error")
return err
}
// waiting ack
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
break
}
}
}
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ack %s", t.id, dst.String())
_, err = UDPWrite(conn, dst, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ack %s", t.id, t.ra.String())
_, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
return err
} else {
gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck")
}
gLog.Printf(LvINFO, "handshakeC2S ok")
return nil
@@ -128,12 +135,11 @@ func handshakeS2C(t *P2PTunnel) error {
// sequencely udp send handshake, do not parallel send
gLog.Printf(LvDEBUG, "send symmetric handshake to %s:%d start", t.config.peerIP, t.config.peerConeNatPort)
gotIt := false
gotMtx := sync.Mutex{}
for i := 0; i < SymmetricHandshakeNum; i++ {
// TODO: auto calc cost time
time.Sleep(SymmetricHandshakeInterval)
// time.Sleep(SymmetricHandshakeInterval)
go func(t *P2PTunnel) error {
conn, err := net.ListenUDP("udp", nil)
conn, err := net.ListenUDP("udp", nil) // TODO: system allocated port really random?
if err != nil {
gLog.Printf(LvDEBUG, "listen error")
return err
@@ -145,38 +151,51 @@ func handshakeS2C(t *P2PTunnel) error {
// gLog.Println(LevelDEBUG, "one of the handshake error:", err)
return err
}
gotMtx.Lock()
defer gotMtx.Unlock()
if gotIt {
return nil
}
gotIt = true
t.la, _ = net.ResolveUDPAddr("udp", conn.LocalAddr().String())
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ", t.id)
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
_, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "handshakeS2C handshake error")
return err
}
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ack %s", t.id, conn.LocalAddr().String())
gotCh <- t.la
return nil
// may read sereral MsgPunchHandshake
for {
_, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "handshakeS2C handshake error")
return err
}
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
break
} else {
gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck")
}
}
}
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck {
gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ack %s", t.id, conn.LocalAddr().String())
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
gotIt = true
la, _ := net.ResolveUDPAddr("udp", conn.LocalAddr().String())
gotCh <- la
return nil
} else {
gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck")
}
return nil
}(t)
}
gLog.Printf(LvDEBUG, "send symmetric handshake end")
gLog.Println(LvDEBUG, "handshakeS2C ready, notify peer connect")
t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id})
if compareVersion(t.config.peerVersion, SymmetricSimultaneouslySendVersion) == LESS { // compatible with old client
gLog.Println(LvDEBUG, "handshakeS2C ready, notify peer connect")
t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id})
}
select {
case <-time.After(HandshakeTimeout):
return fmt.Errorf("wait handshake failed")
case la := <-gotCh:
t.la = la
gLog.Println(LvDEBUG, "symmetric handshake ok", la)
gLog.Printf(LvINFO, "handshakeS2C ok")
}

155
core/iptree.go Normal file
View File

@@ -0,0 +1,155 @@
package openp2p
import (
"bytes"
"encoding/binary"
"fmt"
"log"
"net"
"strings"
"sync"
"github.com/emirpasic/gods/trees/avltree"
"github.com/emirpasic/gods/utils"
)
type IPTree struct {
tree *avltree.Tree
treeMtx sync.RWMutex
}
// add 120k cost 0.5s
func (iptree *IPTree) AddIntIP(minIP uint32, maxIP uint32) bool {
if minIP > maxIP {
return false
}
iptree.treeMtx.Lock()
defer iptree.treeMtx.Unlock()
newMinIP := minIP
newMaxIP := maxIP
cur := iptree.tree.Root
for {
if cur == nil {
break
}
curMaxIP := cur.Value.(uint32)
curMinIP := cur.Key.(uint32)
// newNode all in existNode, treat as inserted.
if newMinIP >= curMinIP && newMaxIP <= curMaxIP {
return true
}
// has no interset
if newMinIP > curMaxIP {
cur = cur.Children[1]
continue
}
if newMaxIP < curMinIP {
cur = cur.Children[0]
continue
}
// has interset, rm it and Add the new merged ip segment
iptree.tree.Remove(curMinIP)
if curMinIP < newMinIP {
newMinIP = curMinIP
}
if curMaxIP > newMaxIP {
newMaxIP = curMaxIP
}
cur = iptree.tree.Root
}
// put in the tree
iptree.tree.Put(newMinIP, newMaxIP)
return true
}
func (iptree *IPTree) Add(minIPStr string, maxIPStr string) bool {
var minIP, maxIP uint32
binary.Read(bytes.NewBuffer(net.ParseIP(minIPStr).To4()), binary.BigEndian, &minIP)
binary.Read(bytes.NewBuffer(net.ParseIP(maxIPStr).To4()), binary.BigEndian, &maxIP)
return iptree.AddIntIP(minIP, maxIP)
}
func (iptree *IPTree) Contains(ipStr string) bool {
var ip uint32
binary.Read(bytes.NewBuffer(net.ParseIP(ipStr).To4()), binary.BigEndian, &ip)
return iptree.ContainsInt(ip)
}
func (iptree *IPTree) ContainsInt(ip uint32) bool {
iptree.treeMtx.RLock()
defer iptree.treeMtx.RUnlock()
if iptree.tree == nil {
return false
}
n := iptree.tree.Root
for n != nil {
curMaxIP := n.Value.(uint32)
curMinIP := n.Key.(uint32)
switch {
case ip >= curMinIP && ip <= curMaxIP: // hit
return true
case ip < curMinIP:
n = n.Children[0]
default:
n = n.Children[1]
}
}
return false
}
func (iptree *IPTree) Size() int {
iptree.treeMtx.RLock()
defer iptree.treeMtx.RUnlock()
return iptree.tree.Size()
}
func (iptree *IPTree) Print() {
iptree.treeMtx.RLock()
defer iptree.treeMtx.RUnlock()
log.Println("size:", iptree.Size())
log.Println(iptree.tree.String())
}
func (iptree *IPTree) Clear() {
iptree.treeMtx.Lock()
defer iptree.treeMtx.Unlock()
iptree.tree.Clear()
}
// input format 127.0.0.1,192.168.1.0/24,10.1.1.30-10.1.1.50
// 127.0.0.1
// 192.168.1.0/24
// 192.168.1.1-192.168.1.10
func NewIPTree(ips string) *IPTree {
iptree := &IPTree{
tree: avltree.NewWith(utils.UInt32Comparator),
}
ipArr := strings.Split(ips, ",")
for _, ip := range ipArr {
if strings.Contains(ip, "/") { // x.x.x.x/24
_, ipNet, err := net.ParseCIDR(ip)
if err != nil {
fmt.Println("Error parsing CIDR:", err)
continue
}
minIP := ipNet.IP.Mask(ipNet.Mask).String()
maxIP := calculateMaxIP(ipNet).String()
iptree.Add(minIP, maxIP)
} else if strings.Contains(ip, "-") { // x.x.x.x-y.y.y.y
minAndMax := strings.Split(ip, "-")
iptree.Add(minAndMax[0], minAndMax[1])
} else { // single ip
iptree.Add(ip, ip)
}
}
return iptree
}
func calculateMaxIP(ipNet *net.IPNet) net.IP {
maxIP := make(net.IP, len(ipNet.IP))
copy(maxIP, ipNet.IP)
for i := range maxIP {
maxIP[i] |= ^ipNet.Mask[i]
}
return maxIP
}

171
core/iptree_test.go Normal file
View File

@@ -0,0 +1,171 @@
package openp2p
import (
"bytes"
"encoding/binary"
"net"
"testing"
)
func wrapTestContains(t *testing.T, iptree *IPTree, ip string, result bool) {
if iptree.Contains(ip) == result {
// t.Logf("compare version %s %s ok\n", v1, v2)
} else {
t.Errorf("test %s fail\n", ip)
}
}
func wrapBenchmarkContains(t *testing.B, iptree *IPTree, ip string, result bool) {
if iptree.Contains(ip) == result {
// t.Logf("compare version %s %s ok\n", v1, v2)
} else {
t.Errorf("test %s fail\n", ip)
}
}
func TestAllInputFormat(t *testing.T) {
iptree := NewIPTree("219.137.185.70,127.0.0.1,127.0.0.0/8,192.168.1.0/24,192.168.3.100-192.168.3.255,192.168.100.0-192.168.200.255")
wrapTestContains(t, iptree, "127.0.0.1", true)
wrapTestContains(t, iptree, "127.0.0.2", true)
wrapTestContains(t, iptree, "127.1.1.1", true)
wrapTestContains(t, iptree, "219.137.185.70", true)
wrapTestContains(t, iptree, "219.137.185.71", false)
wrapTestContains(t, iptree, "192.168.1.2", true)
wrapTestContains(t, iptree, "192.168.2.2", false)
wrapTestContains(t, iptree, "192.168.3.1", false)
wrapTestContains(t, iptree, "192.168.3.100", true)
wrapTestContains(t, iptree, "192.168.3.255", true)
wrapTestContains(t, iptree, "192.168.150.1", true)
wrapTestContains(t, iptree, "192.168.250.1", false)
}
func TestSingleIP(t *testing.T) {
iptree := NewIPTree("")
iptree.Add("219.137.185.70", "219.137.185.70")
wrapTestContains(t, iptree, "219.137.185.70", true)
wrapTestContains(t, iptree, "219.137.185.71", false)
}
func TestWrongSegment(t *testing.T) {
iptree := NewIPTree("")
inserted := iptree.Add("87.251.75.0", "82.251.75.255")
if inserted {
t.Errorf("TestWrongSegment failed\n")
}
}
func TestSegment2(t *testing.T) {
iptree := NewIPTree("")
iptree.Clear()
iptree.Add("10.1.5.50", "10.1.5.100")
iptree.Add("10.1.1.50", "10.1.1.100")
iptree.Add("10.1.2.50", "10.1.2.100")
iptree.Add("10.1.6.50", "10.1.6.100")
iptree.Add("10.1.7.50", "10.1.7.100")
iptree.Add("10.1.3.50", "10.1.3.100")
iptree.Add("10.1.1.1", "10.1.1.10") // no interset
iptree.Add("10.1.1.200", "10.1.1.250") // no interset
iptree.Print()
iptree.Add("10.1.1.80", "10.1.1.90") // all in
iptree.Add("10.1.1.40", "10.1.1.60") // interset
iptree.Print()
iptree.Add("10.1.1.90", "10.1.1.110") // interset
iptree.Print()
t.Logf("ipTree size:%d\n", iptree.Size())
wrapTestContains(t, iptree, "10.1.1.40", true)
wrapTestContains(t, iptree, "10.1.5.50", true)
wrapTestContains(t, iptree, "10.1.6.50", true)
wrapTestContains(t, iptree, "10.1.7.50", true)
wrapTestContains(t, iptree, "10.1.2.50", true)
wrapTestContains(t, iptree, "10.1.3.50", true)
wrapTestContains(t, iptree, "10.1.1.60", true)
wrapTestContains(t, iptree, "10.1.1.90", true)
wrapTestContains(t, iptree, "10.1.1.110", true)
wrapTestContains(t, iptree, "10.1.1.250", true)
wrapTestContains(t, iptree, "10.1.2.60", true)
wrapTestContains(t, iptree, "10.1.100.30", false)
wrapTestContains(t, iptree, "10.1.200.30", false)
iptree.Add("10.0.0.0", "10.255.255.255") // will merge all segment
iptree.Print()
if iptree.Size() != 1 {
t.Errorf("merge ip segment error\n")
}
}
func BenchmarkBuildipTree20k(t *testing.B) {
iptree := NewIPTree("")
iptree.Clear()
iptree.Add("10.1.5.50", "10.1.5.100")
iptree.Add("10.1.1.50", "10.1.1.100")
iptree.Add("10.1.2.50", "10.1.2.100")
iptree.Add("10.1.6.50", "10.1.6.100")
iptree.Add("10.1.7.50", "10.1.7.100")
iptree.Add("10.1.3.50", "10.1.3.100")
iptree.Add("10.1.1.1", "10.1.1.10") // no interset
iptree.Add("10.1.1.200", "10.1.1.250") // no interset
iptree.Add("10.1.1.80", "10.1.1.90") // all in
iptree.Add("10.1.1.40", "10.1.1.60") // interset
iptree.Add("10.1.1.90", "10.1.1.110") // interset
var minIP uint32
binary.Read(bytes.NewBuffer(net.ParseIP("10.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 10k block ip single
nodeNum := uint32(10000 * 1)
gap := uint32(10)
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i)
// t.Logf("ipTree size:%d\n", iptree.Size())
}
binary.Read(bytes.NewBuffer(net.ParseIP("100.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 100k block ip segment
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i+5)
}
t.Logf("ipTree size:%d\n", iptree.Size())
iptree.Clear()
t.Logf("clear. ipTree size:%d\n", iptree.Size())
}
func BenchmarkQuery(t *testing.B) {
iptree := NewIPTree("")
iptree.Clear()
iptree.Add("10.1.5.50", "10.1.5.100")
iptree.Add("10.1.1.50", "10.1.1.100")
iptree.Add("10.1.2.50", "10.1.2.100")
iptree.Add("10.1.6.50", "10.1.6.100")
iptree.Add("10.1.7.50", "10.1.7.100")
iptree.Add("10.1.3.50", "10.1.3.100")
iptree.Add("10.1.1.1", "10.1.1.10") // no interset
iptree.Add("10.1.1.200", "10.1.1.250") // no interset
iptree.Add("10.1.1.80", "10.1.1.90") // all in
iptree.Add("10.1.1.40", "10.1.1.60") // interset
iptree.Add("10.1.1.90", "10.1.1.110") // interset
var minIP uint32
binary.Read(bytes.NewBuffer(net.ParseIP("10.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 10k block ip single
nodeNum := uint32(10000 * 100)
gap := uint32(10)
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i)
// t.Logf("ipTree size:%d\n", iptree.Size())
}
binary.Read(bytes.NewBuffer(net.ParseIP("100.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 100k block ip segment
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i+5)
}
t.Logf("ipTree size:%d\n", iptree.Size())
t.ResetTimer()
queryNum := 100 * 10000
for i := 0; i < queryNum; i++ {
iptree.ContainsInt(minIP + uint32(i))
wrapBenchmarkContains(t, iptree, "10.1.5.55", true)
wrapBenchmarkContains(t, iptree, "10.1.1.1", true)
wrapBenchmarkContains(t, iptree, "10.1.5.200", false)
wrapBenchmarkContains(t, iptree, "200.1.1.1", false)
}
t.Logf("query list:%d\n", queryNum*4)
}

View File

@@ -36,9 +36,8 @@ func init() {
}
const (
LogFile = iota
LogFile = 1 << iota
LogConsole
LogFileAndConsole
)
type logger struct {
@@ -140,10 +139,10 @@ func (l *logger) Printf(level LogLevel, format string, params ...interface{}) {
}
pidAndLevel := []interface{}{l.pid, loglevel[level]}
params = append(pidAndLevel, params...)
if l.mode == LogFile || l.mode == LogFileAndConsole {
if l.mode & LogFile != 0 {
l.loggers[0].Printf("%d %s "+format+l.lineEnding, params...)
}
if l.mode == LogConsole || l.mode == LogFileAndConsole {
if l.mode & LogConsole != 0 {
l.stdLogger.Printf("%d %s "+format+l.lineEnding, params...)
}
}
@@ -157,10 +156,10 @@ func (l *logger) Println(level LogLevel, params ...interface{}) {
pidAndLevel := []interface{}{l.pid, " ", loglevel[level], " "}
params = append(pidAndLevel, params...)
params = append(params, l.lineEnding)
if l.mode == LogFile || l.mode == LogFileAndConsole {
if l.mode & LogFile != 0 {
l.loggers[0].Print(params...)
}
if l.mode == LogConsole || l.mode == LogFileAndConsole {
if l.mode & LogConsole != 0 {
l.stdLogger.Print(params...)
}
}

View File

@@ -7,7 +7,6 @@ import (
"net"
"strconv"
"strings"
"sync"
"time"
reuse "github.com/openp2p-cn/go-reuseport"
@@ -84,7 +83,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string,
return "", 0, err
}
natRsp := NatDetectRsp{}
err = json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp)
json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp)
return natRsp.IP, natRsp.Port, nil
}
@@ -113,20 +112,18 @@ func getNATType(host string, udp1 int, udp2 int) (publicIP string, NATType int,
func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATPMP int) {
var echoConn *net.UDPConn
var wg sync.WaitGroup
wg.Add(1)
gLog.Println(LvDEBUG, "echo server start")
var err error
echoConn, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: echoPort})
if err != nil { // listen error
gLog.Println(LvERROR, "echo server listen error:", err)
return
}
defer echoConn.Close()
go func() {
gLog.Println(LvDEBUG, "echo server start")
var err error
echoConn, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: echoPort})
if err != nil {
gLog.Println(LvERROR, "echo server listen error:", err)
return
}
buf := make([]byte, 1600)
// close outside for breaking the ReadFromUDP
// wait 5s for echo testing
wg.Done()
// wait 30s for echo testing
buf := make([]byte, 1600)
echoConn.SetReadDeadline(time.Now().Add(time.Second * 30))
n, addr, err := echoConn.ReadFromUDP(buf)
if err != nil {
@@ -135,8 +132,6 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP
echoConn.WriteToUDP(buf[0:n], addr)
gLog.Println(LvDEBUG, "echo server end")
}()
wg.Wait() // wait echo udp
defer echoConn.Close()
// testing for public ip
for i := 0; i < 2; i++ {
if i == 1 {

View File

@@ -13,24 +13,13 @@ func Run() {
rand.Seed(time.Now().UnixNano())
baseDir := filepath.Dir(os.Args[0])
os.Chdir(baseDir) // for system service
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFileAndConsole)
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFile|LogConsole)
// TODO: install sub command, deamon process
if len(os.Args) > 1 {
switch os.Args[1] {
case "version", "-v", "--version":
fmt.Println(OpenP2PVersion)
return
case "update":
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFileAndConsole)
targetPath := filepath.Join(defaultInstallPath, defaultBinName)
d := daemon{}
err := d.Control("restart", targetPath, nil)
if err != nil {
gLog.Println(LvERROR, "restart service error:", err)
} else {
gLog.Println(LvINFO, "restart service ok.")
}
return
case "install":
install()
return
@@ -74,7 +63,7 @@ var network *P2PNetwork
func RunAsModule(baseDir string, token string, bw int, logLevel int) *P2PNetwork {
rand.Seed(time.Now().UnixNano())
os.Chdir(baseDir) // for system service
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFileAndConsole)
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFile|LogConsole)
parseParams("")

View File

@@ -17,6 +17,7 @@ type p2pApp struct {
listener net.Listener
listenerUDP *net.UDPConn
tunnel *P2PTunnel
iptree *IPTree
rtid uint64 // relay tunnelID
relayNode string
relayMode string
@@ -64,6 +65,15 @@ func (app *p2pApp) listenTCP() error {
}
break
}
// check white list
if app.config.Whitelist != "" {
remoteIP := strings.Split(conn.RemoteAddr().String(), ":")[0]
if !app.iptree.Contains(remoteIP) {
conn.Close()
gLog.Printf(LvERROR, "%s not in whitelist, access denied", remoteIP)
continue
}
}
oConn := overlayConn{
tunnel: app.tunnel,
connTCP: conn,

View File

@@ -3,6 +3,7 @@ package openp2p
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"encoding/json"
"errors"
@@ -10,6 +11,7 @@ import (
"math/rand"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"time"
@@ -25,8 +27,12 @@ var (
const (
retryLimit = 20
retryInterval = 10 * time.Second
dtma = 20
ddtma = 5
)
// golang not support float64 const
var (
ma10 float64 = 1.0 / 10
ma20 float64 = 1.0 / 20
)
type P2PNetwork struct {
@@ -38,12 +44,12 @@ type P2PNetwork struct {
writeMtx sync.Mutex
hbTime time.Time
// for sync server time
t1 int64 // nanoSeconds
dt int64 // client faster then server dt nanoSeconds
dtma int64
ddt int64 // differential of dt
t1 int64 // nanoSeconds
dt int64 // client faster then server dt nanoSeconds
ddtma int64
ddt int64 // differential of dt
// msgMap sync.Map
msgMap map[uint64]chan []byte //key: nodeID
msgMap map[uint64]chan pushMsg //key: nodeID
msgMapMtx sync.Mutex
config NetworkConfig
allTunnels sync.Map
@@ -51,6 +57,13 @@ type P2PNetwork struct {
limiter *BandwidthLimiter
}
type pushMsg struct {
data []byte
ts time.Time
}
const msgExpiredTime = time.Minute
func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
if instance == nil {
once.Do(func() {
@@ -58,17 +71,24 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
restartCh: make(chan bool, 2),
online: false,
running: true,
msgMap: make(map[uint64]chan []byte),
msgMap: make(map[uint64]chan pushMsg),
limiter: newBandwidthLimiter(config.ShareBandwidth),
dt: 0,
ddt: 0,
}
instance.msgMap[0] = make(chan []byte) // for gateway
instance.msgMap[0] = make(chan pushMsg) // for gateway
if config != nil {
instance.config = *config
}
instance.init()
go instance.run()
go func() {
for {
instance.refreshIPv6(false)
time.Sleep(time.Hour)
}
}()
cleanTempFiles()
})
}
return instance
@@ -112,30 +132,18 @@ func (pn *P2PNetwork) runAll() {
allApps := gConf.Apps // read a copy, other thread will modify the gConf.Apps
for _, config := range allApps {
if config.nextRetryTime.After(time.Now()) {
continue
}
if config.Enabled == 0 {
if config.nextRetryTime.After(time.Now()) || config.Enabled == 0 || config.retryNum >= retryLimit {
continue
}
if config.AppName == "" {
config.AppName = fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)
config.AppName = config.ID()
}
appExist := false
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
if ok {
app := i.(*p2pApp)
appExist = true
if app.isActive() {
if i, ok := pn.apps.Load(config.ID()); ok {
if app := i.(*p2pApp); app.isActive() {
continue
}
}
if appExist {
pn.DeleteApp(*config)
}
if config.retryNum >= retryLimit {
continue
}
if config.retryNum > 0 { // first time not show reconnect log
gLog.Printf(LvINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum)
@@ -160,6 +168,7 @@ func (pn *P2PNetwork) runAll() {
}
}
}
func (pn *P2PNetwork) autorunApp() {
gLog.Println(LvINFO, "autorunApp start")
pn.wgReconnect.Add(1)
@@ -175,53 +184,56 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
gLog.Printf(LvINFO, "addRelayTunnel to %s start", config.PeerNode)
defer gLog.Printf(LvINFO, "addRelayTunnel to %s end", config.PeerNode)
// request a relay node or specify manually(TODO)
pn.write(MsgRelay, MsgRelayNodeReq, &RelayNodeReq{config.PeerNode})
head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, ClientAPITimeout)
if head == nil {
return nil, 0, "", errors.New("read MsgRelayNodeRsp error")
}
rsp := RelayNodeRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error")
}
if rsp.RelayName == "" || rsp.RelayToken == 0 {
gLog.Printf(LvERROR, "MsgRelayNodeReq error")
return nil, 0, "", errors.New("MsgRelayNodeReq error")
}
gLog.Printf(LvINFO, "got relay node:%s", rsp.RelayName)
relayConfig := config
relayConfig.PeerNode = rsp.RelayName
relayConfig.peerToken = rsp.RelayToken
relayMode := "private"
if config.RelayNode == "" {
pn.write(MsgRelay, MsgRelayNodeReq, &RelayNodeReq{config.PeerNode})
head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, ClientAPITimeout)
if head == nil {
return nil, 0, "", errors.New("read MsgRelayNodeRsp error")
}
rsp := RelayNodeRsp{}
if err := json.Unmarshal(body, &rsp); err != nil {
return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error")
}
if rsp.RelayName == "" || rsp.RelayToken == 0 {
gLog.Printf(LvERROR, "MsgRelayNodeReq error")
return nil, 0, "", errors.New("MsgRelayNodeReq error")
}
gLog.Printf(LvINFO, "got relay node:%s", rsp.RelayName)
relayConfig.PeerNode = rsp.RelayName
relayConfig.peerToken = rsp.RelayToken
relayMode = rsp.Mode
} else {
relayConfig.PeerNode = config.RelayNode
}
///
t, err := pn.addDirectTunnel(relayConfig, 0)
if err != nil {
gLog.Println(LvERROR, "direct connect error:", err)
return nil, 0, "", err
return nil, 0, "", ErrConnectRelayNode // relay offline will stop retry
}
// notify peer addRelayTunnel
req := AddRelayTunnelReq{
From: pn.config.Node,
RelayName: rsp.RelayName,
RelayToken: rsp.RelayToken,
RelayName: relayConfig.PeerNode,
RelayToken: relayConfig.peerToken,
}
gLog.Printf(LvINFO, "push relay %s---------%s", config.PeerNode, rsp.RelayName)
gLog.Printf(LvINFO, "push relay %s---------%s", config.PeerNode, relayConfig.PeerNode)
pn.push(config.PeerNode, MsgPushAddRelayTunnelReq, &req)
// wait relay ready
head, body = pn.read(config.PeerNode, MsgPush, MsgPushAddRelayTunnelRsp, PeerAddRelayTimeount) // TODO: const value
head, body := pn.read(config.PeerNode, MsgPush, MsgPushAddRelayTunnelRsp, PeerAddRelayTimeount)
if head == nil {
gLog.Printf(LvERROR, "read MsgPushAddRelayTunnelRsp error")
return nil, 0, "", errors.New("read MsgPushAddRelayTunnelRsp error")
}
rspID := TunnelMsg{}
err = json.Unmarshal(body, &rspID)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error")
if err = json.Unmarshal(body, &rspID); err != nil {
return nil, 0, "", errors.New("peer connect relayNode error")
}
return t, rspID.ID, rsp.Mode, err
return t, rspID.ID, relayMode, err
}
// use *AppConfig to save status
@@ -233,7 +245,7 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
}
// check if app already exist?
appExist := false
_, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
_, ok := pn.apps.Load(config.ID())
if ok {
appExist = true
}
@@ -299,11 +311,13 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
key: appKey,
tunnel: t,
config: config,
iptree: NewIPTree(config.Whitelist),
rtid: rtid,
relayNode: relayNode,
relayMode: relayMode,
hbTime: time.Now()}
pn.apps.Store(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort), &app)
pn.apps.Store(config.ID(), &app)
gLog.Printf(LvINFO, "%s use tunnel %d", app.config.AppName, app.tunnel.id)
if err == nil {
go app.listen()
}
@@ -314,16 +328,37 @@ func (pn *P2PNetwork) DeleteApp(config AppConfig) {
gLog.Printf(LvINFO, "DeleteApp %s%d start", config.Protocol, config.SrcPort)
defer gLog.Printf(LvINFO, "DeleteApp %s%d end", config.Protocol, config.SrcPort)
// close the apps of this config
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
i, ok := pn.apps.Load(config.ID())
if ok {
app := i.(*p2pApp)
gLog.Printf(LvINFO, "app %s exist, delete it", fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
gLog.Printf(LvINFO, "app %s exist, delete it", app.config.AppName)
app.close()
pn.apps.Delete(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
pn.apps.Delete(config.ID())
}
}
func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, error) {
func (pn *P2PNetwork) findTunnel(config *AppConfig) (t *P2PTunnel) {
// find existing tunnel to peer
pn.allTunnels.Range(func(id, i interface{}) bool {
tmpt := i.(*P2PTunnel)
if tmpt.config.PeerNode == config.PeerNode {
gLog.Println(LvINFO, "tunnel already exist ", config.PeerNode)
isActive := tmpt.checkActive()
// inactive, close it
if !isActive {
gLog.Println(LvINFO, "but it's not active, close it ", config.PeerNode)
tmpt.close()
} else {
t = tmpt
}
return false
}
return true
})
return t
}
func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunnel, err error) {
gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
isClient := false
@@ -332,64 +367,29 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
tid = rand.Uint64()
isClient = true
}
exist := false
// find existing tunnel to peer
var t *P2PTunnel
pn.allTunnels.Range(func(id, i interface{}) bool {
t = i.(*P2PTunnel)
if t.config.PeerNode == config.PeerNode {
// server side force close existing tunnel
if !isClient {
t.close()
return false
}
// client side checking
gLog.Println(LvINFO, "tunnel already exist ", config.PeerNode)
isActive := t.checkActive()
// inactive, close it
if !isActive {
gLog.Println(LvINFO, "but it's not active, close it ", config.PeerNode)
t.close()
} else {
// active
exist = true
}
return false
}
return true
})
if exist {
return t, nil
}
// create tunnel if not exist
t = &P2PTunnel{pn: pn,
config: config,
id: tid,
}
pn.msgMapMtx.Lock()
pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan []byte, 50)
pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan pushMsg, 50)
pn.msgMapMtx.Unlock()
// server side
if !isClient {
err := pn.newTunnel(t, tid, isClient)
t, err = pn.newTunnel(config, tid, isClient)
return t, err // always return
}
// client side
// peer info
initErr := t.requestPeerInfo()
initErr := pn.requestPeerInfo(&config)
if initErr != nil {
gLog.Println(LvERROR, "init error:", initErr)
return nil, initErr
}
var err error
// try TCP6
if IsIPv6(t.config.peerIPv6) && IsIPv6(t.pn.config.publicIPv6) {
if IsIPv6(config.peerIPv6) && IsIPv6(gConf.IPv6()) {
gLog.Println(LvINFO, "try TCP6")
t.config.linkMode = LinkModeTCP6
t.config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil {
config.linkMode = LinkModeTCP6
config.isUnderlayServer = 0
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil
}
}
@@ -397,59 +397,77 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
// TODO: try UDP6
// try TCP4
if t.config.hasIPv4 == 1 || t.pn.config.hasIPv4 == 1 || t.config.hasUPNPorNATPMP == 1 || t.pn.config.hasUPNPorNATPMP == 1 {
if config.hasIPv4 == 1 || pn.config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 || pn.config.hasUPNPorNATPMP == 1 {
gLog.Println(LvINFO, "try TCP4")
t.config.linkMode = LinkModeTCP4
if t.config.hasIPv4 == 1 || t.config.hasUPNPorNATPMP == 1 {
t.config.isUnderlayServer = 0
config.linkMode = LinkModeTCP4
if config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 {
config.isUnderlayServer = 0
} else {
t.config.isUnderlayServer = 1
config.isUnderlayServer = 1
}
if err = pn.newTunnel(t, tid, isClient); err == nil {
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil
}
}
// TODO: try UDP4
// try TCPPunch
if t.config.peerNatType == NATCone && t.pn.config.natType == NATCone { // TODO: support c2s
gLog.Println(LvINFO, "try TCP4 Punch")
t.config.linkMode = LinkModeTCPPunch
t.config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil {
gLog.Println(LvINFO, "TCP4 Punch ok")
return t, nil
for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries
if config.peerNatType == NATCone && pn.config.natType == NATCone { // TODO: support c2s
gLog.Println(LvINFO, "try TCP4 Punch")
config.linkMode = LinkModeTCPPunch
config.isUnderlayServer = 0
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
gLog.Println(LvINFO, "TCP4 Punch ok")
return t, nil
}
}
}
// try UDPPunch
if t.config.peerNatType == NATCone || t.pn.config.natType == NATCone {
gLog.Println(LvINFO, "try UDP4 Punch")
t.config.linkMode = LinkModeUDPPunch
t.config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil {
return t, nil
for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries
if config.peerNatType == NATCone || pn.config.natType == NATCone {
gLog.Println(LvINFO, "try UDP4 Punch")
config.linkMode = LinkModeUDPPunch
config.isUnderlayServer = 0
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil
}
}
if !(config.peerNatType == NATCone && pn.config.natType == NATCone) { // not cone2cone, no more try
break
}
}
return nil, ErrorHandshake // only ErrorHandshake will try relay
}
func (pn *P2PNetwork) newTunnel(t *P2PTunnel, tid uint64, isClient bool) error {
func (pn *P2PNetwork) newTunnel(config AppConfig, tid uint64, isClient bool) (t *P2PTunnel, err error) {
if isClient { // only client side find existing tunnel
if existTunnel := pn.findTunnel(&config); existTunnel != nil {
return existTunnel, nil
}
}
t = &P2PTunnel{pn: pn,
config: config,
id: tid,
}
t.initPort()
if isClient {
if err := t.connect(); err != nil {
if err = t.connect(); err != nil {
gLog.Println(LvERROR, "p2pTunnel connect error:", err)
return err
return
}
} else {
if err := t.listen(); err != nil {
if err = t.listen(); err != nil {
gLog.Println(LvERROR, "p2pTunnel listen error:", err)
return err
return
}
}
// store it when success
gLog.Printf(LvDEBUG, "store tunnel %d", tid)
pn.allTunnels.Store(tid, t)
return nil
return
}
func (pn *P2PNetwork) init() error {
gLog.Println(LvINFO, "init start")
@@ -480,7 +498,17 @@ func (pn *P2PNetwork) init() error {
gLog.Println(LvDEBUG, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP)
gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort)
uri := "/api/v1/login"
config := tls.Config{InsecureSkipVerify: true} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert
caCertPool, err := x509.SystemCertPool()
if err != nil {
gLog.Println(LvERROR, "Failed to load system root CAs:", err)
} else {
caCertPool = x509.NewCertPool()
}
caCertPool.AppendCertsFromPEM([]byte(rootCA))
caCertPool.AppendCertsFromPEM([]byte(ISRGRootX1))
config := tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: false} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert
websocket.DefaultDialer.TLSClientConfig = &config
u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri}
q := u.Query()
@@ -520,13 +548,13 @@ func (pn *P2PNetwork) init() error {
gLog.Println(LvDEBUG, "netinfo:", rsp)
if rsp != nil && rsp.Country != "" {
if IsIPv6(rsp.IP.String()) {
pn.config.publicIPv6 = rsp.IP.String()
gConf.setIPv6(rsp.IP.String())
}
req.NetInfo = *rsp
} else {
pn.refreshIPv6(true)
}
req.IPv6 = pn.config.publicIPv6
req.IPv6 = gConf.IPv6()
pn.write(MsgReport, MsgReportBasic, &req)
}()
go pn.autorunApp()
@@ -552,9 +580,8 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
case MsgLogin:
// gLog.Println(LevelINFO,string(msg))
rsp := LoginRsp{}
err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong login response:%s", err)
if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(rsp), err)
return
}
if rsp.Error != 0 {
@@ -577,27 +604,30 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
rtt := pn.hbTime.UnixNano() - pn.t1
t2 := int64(binary.LittleEndian.Uint64(msg[openP2PHeaderSize : openP2PHeaderSize+8]))
dt := pn.t1 + rtt/2 - t2
if pn.dtma == 0 {
pn.dtma = dt
} else {
if pn.dt != 0 {
ddt := dt - pn.dt
// if pn.ddt == 0 {
if pn.ddtma > 0 && (ddt > (pn.ddtma+pn.ddtma/3) || ddt < (pn.ddtma-pn.ddtma/3)) {
newdt := pn.dt + pn.ddtma
gLog.Printf(LvDEBUG, "server time auto adjust dt=%.2fms to %.2fms", float64(dt)/float64(time.Millisecond), float64(newdt)/float64(time.Millisecond))
dt = newdt
}
pn.ddt = ddt
// } else {
// pn.ddt = pn.ddt/ddtma*(ddtma-1) + ddt/ddtma // avoid int64 overflow
// }
pn.dtma = pn.dtma/dtma*(dtma-1) + dt/dtma // avoid int64 overflow
if pn.ddtma == 0 {
pn.ddtma = pn.ddt
} else {
pn.ddtma = int64(float64(pn.ddtma)*(1-ma10) + float64(pn.ddt)*ma10) // avoid int64 overflow
}
}
pn.dt = dt
gLog.Printf(LvDEBUG, "server time dt=%dms ddt=%dns rtt=%dms", pn.dt/int64(time.Millisecond), pn.ddt, rtt/int64(time.Millisecond))
gLog.Printf(LvDEBUG, "server time dt=%dms ddt=%dns ddtma=%dns rtt=%dms ", pn.dt/int64(time.Millisecond), pn.ddt, pn.ddtma, rtt/int64(time.Millisecond))
case MsgPush:
handlePush(pn, head.SubType, msg)
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[0]
pn.msgMapMtx.Unlock()
ch <- msg
ch <- pushMsg{data: msg, ts: time.Now()}
return
}
}
@@ -696,19 +726,25 @@ func (pn *P2PNetwork) read(node string, mainType uint16, subType uint16, timeout
gLog.Printf(LvERROR, "wait msg%d:%d timeout", mainType, subType)
return
case msg := <-ch:
if msg.ts.Before(time.Now().Add(-msgExpiredTime)) {
gLog.Printf(LvDEBUG, "msg expired error %d:%d", head.MainType, head.SubType)
continue
}
head = &openP2PHeader{}
err := binary.Read(bytes.NewReader(msg[:openP2PHeaderSize]), binary.LittleEndian, head)
err := binary.Read(bytes.NewReader(msg.data[:openP2PHeaderSize]), binary.LittleEndian, head)
if err != nil {
gLog.Println(LvERROR, "read msg error:", err)
break
}
if head.MainType != mainType || head.SubType != subType {
gLog.Printf(LvDEBUG, "read msg type error %d:%d, requeue it", head.MainType, head.SubType)
ch <- msg
continue
}
if mainType == MsgPush {
body = msg[openP2PHeaderSize+PushHeaderSize:]
body = msg.data[openP2PHeaderSize+PushHeaderSize:]
} else {
body = msg[openP2PHeaderSize:]
body = msg.data[openP2PHeaderSize:]
}
return
}
@@ -728,21 +764,52 @@ func (pn *P2PNetwork) updateAppHeartbeat(appID uint64) {
// ipv6 will expired need to refresh.
func (pn *P2PNetwork) refreshIPv6(force bool) {
if !force && !IsIPv6(pn.config.publicIPv6) { // not support ipv6, not refresh
if !force && !IsIPv6(gConf.IPv6()) { // not support ipv6, not refresh
return
}
client := &http.Client{Timeout: time.Second * 10}
r, err := client.Get("http://6.ipw.cn")
if err != nil {
gLog.Println(LvDEBUG, "refreshIPv6 error:", err)
return
for i := 0; i < 3; i++ {
client := &http.Client{Timeout: time.Second * 10}
r, err := client.Get("http://6.ipw.cn")
if err != nil {
gLog.Println(LvDEBUG, "refreshIPv6 error:", err)
continue
}
defer r.Body.Close()
buf := make([]byte, 1024)
n, err := r.Body.Read(buf)
if n <= 0 {
gLog.Println(LvINFO, "refreshIPv6 error:", err, n)
continue
}
gConf.setIPv6(string(buf[:n]))
break
}
defer r.Body.Close()
buf := make([]byte, 1024)
n, err := r.Body.Read(buf)
if n <= 0 {
gLog.Println(LvINFO, "refreshIPv6 error:", err, n)
return
}
pn.config.publicIPv6 = string(buf[:n])
}
func (pn *P2PNetwork) requestPeerInfo(config *AppConfig) error {
// request peer info
pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{config.peerToken, config.PeerNode})
head, body := pn.read("", MsgQuery, MsgQueryPeerInfoRsp, ClientAPITimeout)
if head == nil {
return ErrNetwork // network error, should not be ErrPeerOffline
}
rsp := QueryPeerInfoRsp{}
if err := json.Unmarshal(body, &rsp); err != nil {
return ErrMsgFormat
}
if rsp.Online == 0 {
return ErrPeerOffline
}
if compareVersion(rsp.Version, LeastSupportVersion) == LESS {
return ErrVersionNotCompatible
}
config.peerVersion = rsp.Version
config.hasIPv4 = rsp.HasIPv4
config.peerIP = rsp.IPv4
config.peerIPv6 = rsp.IPv6
config.hasUPNPorNATPMP = rsp.HasUPNPorNATPMP
config.peerNatType = rsp.NatType
///
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"math/rand"
"net"
"reflect"
"sync"
"time"
)
@@ -32,34 +33,6 @@ type P2PTunnel struct {
punchTs uint64
}
func (t *P2PTunnel) requestPeerInfo() error {
// request peer info
t.pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{t.config.peerToken, t.config.PeerNode})
head, body := t.pn.read("", MsgQuery, MsgQueryPeerInfoRsp, UnderlayConnectTimeout)
if head == nil {
return ErrNetwork // network error, should not be ErrPeerOffline
}
rsp := QueryPeerInfoRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong QueryPeerInfoRsp:%s", err)
return ErrMsgFormat
}
if rsp.Online == 0 {
return ErrPeerOffline
}
if compareVersion(rsp.Version, LeastSupportVersion) == LESS {
return ErrVersionNotCompatible
}
t.config.peerVersion = rsp.Version
t.config.hasIPv4 = rsp.HasIPv4
t.config.peerIP = rsp.IPv4
t.config.peerIPv6 = rsp.IPv6
t.config.hasUPNPorNATPMP = rsp.HasUPNPorNATPMP
t.config.peerNatType = rsp.NatType
///
return nil
}
func (t *P2PTunnel) initPort() {
t.running = true
t.hbMtx.Lock()
@@ -67,9 +40,6 @@ func (t *P2PTunnel) initPort() {
t.hbMtx.Unlock()
t.hbTimeRelay = time.Now().Add(time.Second * 600) // TODO: test fake time
localPort := int(rand.Uint32()%15000 + 50000) // if the process has bug, will add many upnp port. use specify p2p port by param
if t.config.linkMode == LinkModeTCP6 {
t.pn.refreshIPv6(false)
}
if t.config.linkMode == LinkModeTCP6 || t.config.linkMode == LinkModeTCP4 {
t.coneLocalPort = t.pn.config.TCPPort
t.coneNatPort = t.pn.config.TCPPort // symmetric doesn't need coneNatPort
@@ -101,7 +71,7 @@ func (t *P2PTunnel) connect() error {
ConeNatPort: t.coneNatPort,
NatType: t.pn.config.natType,
HasIPv4: t.pn.config.hasIPv4,
IPv6: t.pn.config.publicIPv6,
IPv6: gConf.IPv6(),
HasUPNPorNATPMP: t.pn.config.hasUPNPorNATPMP,
ID: t.id,
AppKey: appKey,
@@ -113,14 +83,13 @@ func (t *P2PTunnel) connect() error {
req.Token = t.pn.config.Token
}
t.pn.push(t.config.PeerNode, MsgPushConnectReq, req)
head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, ClientAPITimeout)
head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, HandshakeTimeout*3)
if head == nil {
return errors.New("connect error")
}
rsp := PushConnectRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushConnectRsp:%s", err)
if err := json.Unmarshal(body, &rsp); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(rsp), err)
return err
}
// gLog.Println(LevelINFO, rsp)
@@ -135,7 +104,7 @@ func (t *P2PTunnel) connect() error {
t.config.peerConeNatPort = rsp.ConeNatPort
t.config.peerIP = rsp.FromIP
t.punchTs = rsp.PunchTs
err = t.start()
err := t.start()
if err != nil {
gLog.Println(LvERROR, "handshake error:", err)
err = ErrorHandshake
@@ -156,23 +125,19 @@ func (t *P2PTunnel) setRun(running bool) {
}
func (t *P2PTunnel) isActive() bool {
if !t.isRuning() {
if !t.isRuning() || t.conn == nil {
return false
}
t.hbMtx.Lock()
defer t.hbMtx.Unlock()
return time.Now().Before(t.hbTime.Add(TunnelIdleTimeout))
return time.Now().Before(t.hbTime.Add(TunnelHeartbeatTime * 2))
}
func (t *P2PTunnel) checkActive() bool {
hbt := time.Now()
t.hbMtx.Lock()
if t.hbTime.Before(time.Now().Add(-TunnelHeartbeatTime)) {
t.hbMtx.Unlock()
if !t.isActive() {
return false
}
t.hbMtx.Unlock()
// hbtime within TunnelHeartbeatTime, check it now
hbt := time.Now()
t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeat, nil)
isActive := false
// wait at most 5s
@@ -184,6 +149,7 @@ func (t *P2PTunnel) checkActive() bool {
t.hbMtx.Unlock()
time.Sleep(time.Millisecond * 100)
}
gLog.Printf(LvINFO, "checkActive %t. hbtime=%d", isActive, t.hbTime)
return isActive
}
@@ -307,7 +273,7 @@ func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) {
return nil, fmt.Errorf("quic listen error:%s", e)
}
}
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3)
gLog.Println(LvDEBUG, "quic dial to ", t.ra.String())
qConn, e = dialQuic(conn, t.ra, TunnelIdleTimeout)
if e != nil {
@@ -355,7 +321,7 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) {
// client side
if t.config.linkMode == LinkModeTCP4 {
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3)
} else { //tcp punch should sleep for punch the same time
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
@@ -411,7 +377,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) {
}
//else
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3)
gLog.Println(LvDEBUG, "TCP6 dial to ", t.config.peerIPv6)
qConn, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort)
if err != nil {
@@ -451,6 +417,7 @@ func (t *P2PTunnel) readLoop() {
}
switch head.SubType {
case MsgTunnelHeartbeat:
t.hbTime = time.Now()
t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeatAck, nil)
gLog.Printf(LvDEBUG, "%d read tunnel heartbeat", t.id)
case MsgTunnelHeartbeatAck:
@@ -492,9 +459,8 @@ func (t *P2PTunnel) readLoop() {
t.pn.relay(tunnelID, body[8:])
case MsgRelayHeartbeat:
req := RelayHeartbeat{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayHeartbeat:%s", err)
if err := json.Unmarshal(body, &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
continue
}
gLog.Printf(LvDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID)
@@ -514,9 +480,8 @@ func (t *P2PTunnel) readLoop() {
t.pn.updateAppHeartbeat(req.AppID)
case MsgOverlayConnectReq:
req := OverlayConnectReq{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgOverlayConnectReq:%s", err)
if err := json.Unmarshal(body, &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
continue
}
// app connect only accept token(not relay totp token), avoid someone using the share relay node's token
@@ -559,9 +524,8 @@ func (t *P2PTunnel) readLoop() {
go oConn.run()
case MsgOverlayDisconnectReq:
req := OverlayDisconnectReq{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LvERROR, "wrong OverlayDisconnectRequest:%s", err)
if err := json.Unmarshal(body, &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
continue
}
overlayID := req.ID
@@ -619,8 +583,7 @@ func (t *P2PTunnel) listen() error {
t.punchTs = rsp.PunchTs
// only private node set ipv6
if t.config.fromToken == t.pn.config.Token {
t.pn.refreshIPv6(false)
rsp.IPv6 = t.pn.config.publicIPv6
rsp.IPv6 = gConf.IPv6()
}
t.pn.push(t.config.PeerNode, MsgPushConnectRsp, rsp)

View File

@@ -10,10 +10,11 @@ import (
"time"
)
const OpenP2PVersion = "3.9.1"
const OpenP2PVersion = "3.10.9"
const ProductName string = "openp2p"
const LeastSupportVersion = "3.0.0"
const SyncServerTimeVersion = "3.9.0"
const SymmetricSimultaneouslySendVersion = "3.10.7"
const (
IfconfigPort1 = 27180
@@ -135,25 +136,25 @@ const (
const (
ReadBuffLen = 4096 // for UDP maybe not enough
NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow
TunnelHeartbeatTime = time.Second * 15
TunnelHeartbeatTime = time.Second * 10 // some nat udp session expired time less than 15s. change to 10s
TunnelIdleTimeout = time.Minute
SymmetricHandshakeNum = 800 // 0.992379
// SymmetricHandshakeNum = 1000 // 0.999510
SymmetricHandshakeInterval = time.Millisecond
HandshakeTimeout = time.Second * 5
PeerAddRelayTimeount = HandshakeTimeout * 2
HandshakeTimeout = time.Second * 10
PeerAddRelayTimeount = time.Second * 30 // peer need times
CheckActiveTimeout = time.Second * 5
PaddingSize = 16
AESKeySize = 16
MaxRetry = 10
RetryInterval = time.Second * 30
Cone2ConePunchMaxRetry = 1
PublicIPEchoTimeout = time.Second * 1
NatTestTimeout = time.Second * 5
UDPReadTimeout = time.Second * 5
ClientAPITimeout = time.Second * 10
UnderlayConnectTimeout = time.Second * 10
MaxDirectTry = 3
PunchTsDelay = time.Second * 2
PunchTsDelay = time.Second * 3
)
// NATNone has public ip
@@ -350,9 +351,10 @@ type AppInfo struct {
AppName string `json:"appName,omitempty"`
Error string `json:"error,omitempty"`
Protocol string `json:"protocol,omitempty"`
Whitelist string `json:"whitelist,omitempty"`
SrcPort int `json:"srcPort,omitempty"`
Protocol0 string `json:"protocol0,omitempty"`
SrcPort0 int `json:"srcPort0,omitempty"`
SrcPort0 int `json:"srcPort0,omitempty"` // srcport+protocol is uneque, use as old app id
NatType int `json:"natType,omitempty"`
PeerNode string `json:"peerNode,omitempty"`
DstPort int `json:"dstPort,omitempty"`
@@ -439,3 +441,113 @@ type QueryPeerInfoRsp struct {
IPv6 string `json:"IPv6,omitempty"` // if public relay node, ipv6 not set
HasUPNPorNATPMP int `json:"hasUPNPorNATPMP,omitempty"`
}
const rootCA = `-----BEGIN CERTIFICATE-----
MIIDhTCCAm0CFHm0cd8dnGCbUW/OcS56jf0gvRk7MA0GCSqGSIb3DQEBCwUAMH4x
CzAJBgNVBAYTAkNOMQswCQYDVQQIDAJHRDETMBEGA1UECgwKb3BlbnAycC5jbjET
MBEGA1UECwwKb3BlbnAycC5jbjETMBEGA1UEAwwKb3BlbnAycC5jbjEjMCEGCSqG
SIb3DQEJARYUb3BlbnAycC5jbkBnbWFpbC5jb20wIBcNMjMwODAxMDkwMjMwWhgP
MjEyMzA3MDgwOTAyMzBaMH4xCzAJBgNVBAYTAkNOMQswCQYDVQQIDAJHRDETMBEG
A1UECgwKb3BlbnAycC5jbjETMBEGA1UECwwKb3BlbnAycC5jbjETMBEGA1UEAwwK
b3BlbnAycC5jbjEjMCEGCSqGSIb3DQEJARYUb3BlbnAycC5jbkBnbWFpbC5jb20w
ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDWg8wPy5hBLUaY4WOXayKu
+magEz1LAY0krzXYSZaSCvGMwA0cervwAqgKfiiZEhho5UNA5iVOJ6bO1RL9H7Vp
4HuW9BttDU/NQHguD8pyqx06Kaosz5LRw8USz1BCWWFdmi8Mv4I0omtd7m6lbWnY
nrjQKLYPahPW481jUfJPqR6wUTnBuBMr2ZAGqmFR4Lhqs9B1P9GeBfDWNwVApJUC
VEhbElukRJxdUvWeJ5+HMENKQcHCTTgmQbmDLMobHXs3Xf7fT9qC76wOe9LFHI6L
dAww9gryQhxWauQl1NO8aGJTFu+3wgnKBdTMJmF/1iuZYXJOCR1solwqU1hCgBsj
AgMBAAEwDQYJKoZIhvcNAQELBQADggEBADp153YNVN8p6/3PLnXxHBDeDViAfeQd
VJmy8eH1LTq/xtUY71HGSpL7iIBNoQdDTHfsg3c6ZANBCxbO/7AhFAzPt1aK8eHy
XuEiW0Z6R8np1Khh3alCOfD15tKcjok//Wxisbz+YItlbDus/eWRbLGB3HGrzn4l
GB18jw+G7o4U3rGX8agHqVGQEd06gk1ZaprASpTGwSsv4A5ehosjT1d7re8Z5eD4
RVtXS+DplMClQ5QSlv3StwcWOsjyiAimNfLEU5xoEfq17yOJUTU1OTL4YOt16QUc
C1tnzFr3k/ioqFR7cnyzNrbjlfPOmO9l2WReEbMP3bvaSHm6EcpJKS8=
-----END CERTIFICATE-----`
const ISRGRootX1 = `-----BEGIN CERTIFICATE-----
MIIEJjCCAw6gAwIBAgISAztStWq026ej0RCsk3ErbUdPMA0GCSqGSIb3DQEBCwUA
MDIxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MQswCQYDVQQD
EwJSMzAeFw0yMzA4MDQwODUyMjlaFw0yMzExMDIwODUyMjhaMBcxFTATBgNVBAMM
DCoub3BlbnAycC5jbjBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABPRdkgLV2FA+
3g/GjcA9UcfDfIFYgofSTNbOCQFIiQVMXrTgAToF1/tWaS2LOuysZcCX6OE7SCeG
lQ+0g+L2qvujggIaMIICFjAOBgNVHQ8BAf8EBAMCB4AwHQYDVR0lBBYwFAYIKwYB
BQUHAwEGCCsGAQUFBwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFIdL5LNQC+X4
8r6u+3NlM238Vmk5MB8GA1UdIwQYMBaAFBQusxe3WFbLrlAJQOYfr52LFMLGMFUG
CCsGAQUFBwEBBEkwRzAhBggrBgEFBQcwAYYVaHR0cDovL3IzLm8ubGVuY3Iub3Jn
MCIGCCsGAQUFBzAChhZodHRwOi8vcjMuaS5sZW5jci5vcmcvMCMGA1UdEQQcMBqC
DCoub3BlbnAycC5jboIKb3BlbnAycC5jbjATBgNVHSAEDDAKMAgGBmeBDAECATCC
AQQGCisGAQQB1nkCBAIEgfUEgfIA8AB2AHoyjFTYty22IOo44FIe6YQWcDIThU07
0ivBOlejUutSAAABib/2fCgAAAQDAEcwRQIhAJzf9XNe0cu9CNYLLqtDCZZMqI6u
qsHrnnXcFQW23ioZAiAgwKp5DwZw9RmF19KOjD6lYJfTxc+anJUuWAlMwu1HYQB2
AK33vvp8/xDIi509nB4+GGq0Zyldz7EMJMqFhjTr3IKKAAABib/2fEEAAAQDAEcw
RQIgKeI7DopyzFXPdRQZKZrHVqfXQ8OipvlKXd5xRnKFjH4CIQDMM+TU+LOux8xK
1NlTiSs9DhQI/eU3ZXKxSQAqF50RnTANBgkqhkiG9w0BAQsFAAOCAQEATqZ+H2NT
cv4FzArD/Krlnur1OTitvpubRWM+ClB9Cr6pvPVB7Dp0/ALxu35ZmCtrzdJWTfmp
lHxU4nPXRPVjuPRNXooSyH//KTfHyf32919PQOi/qc/QEAuIzkGLJg0dIPKLxaNK
CiTWU+2iAYSHBgCWulfLX/RYNbBZQ9w0xIm3XhuMjCF/omG8ofuz1DmiRVR+17JA
nuDXQkxm7KhmbxSA4PsLwzvIWA8Wk44ZK7uncgRY3WIUXcVRELSFA5LuH67TOwag
al6iG56KW1N2Yy9YmeG27SYvHZYkjmuJ8NEy7Ku+Mi6gwO4hs0CYr2wtUacPfjKF
aYTGWSt6Pt8kmw==
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFFjCCAv6gAwIBAgIRAJErCErPDBinU/bWLiWnX1owDQYJKoZIhvcNAQELBQAw
TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh
cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMjAwOTA0MDAwMDAw
WhcNMjUwOTE1MTYwMDAwWjAyMQswCQYDVQQGEwJVUzEWMBQGA1UEChMNTGV0J3Mg
RW5jcnlwdDELMAkGA1UEAxMCUjMwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQC7AhUozPaglNMPEuyNVZLD+ILxmaZ6QoinXSaqtSu5xUyxr45r+XXIo9cP
R5QUVTVXjJ6oojkZ9YI8QqlObvU7wy7bjcCwXPNZOOftz2nwWgsbvsCUJCWH+jdx
sxPnHKzhm+/b5DtFUkWWqcFTzjTIUu61ru2P3mBw4qVUq7ZtDpelQDRrK9O8Zutm
NHz6a4uPVymZ+DAXXbpyb/uBxa3Shlg9F8fnCbvxK/eG3MHacV3URuPMrSXBiLxg
Z3Vms/EY96Jc5lP/Ooi2R6X/ExjqmAl3P51T+c8B5fWmcBcUr2Ok/5mzk53cU6cG
/kiFHaFpriV1uxPMUgP17VGhi9sVAgMBAAGjggEIMIIBBDAOBgNVHQ8BAf8EBAMC
AYYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMBIGA1UdEwEB/wQIMAYB
Af8CAQAwHQYDVR0OBBYEFBQusxe3WFbLrlAJQOYfr52LFMLGMB8GA1UdIwQYMBaA
FHm0WeZ7tuXkAXOACIjIGlj26ZtuMDIGCCsGAQUFBwEBBCYwJDAiBggrBgEFBQcw
AoYWaHR0cDovL3gxLmkubGVuY3Iub3JnLzAnBgNVHR8EIDAeMBygGqAYhhZodHRw
Oi8veDEuYy5sZW5jci5vcmcvMCIGA1UdIAQbMBkwCAYGZ4EMAQIBMA0GCysGAQQB
gt8TAQEBMA0GCSqGSIb3DQEBCwUAA4ICAQCFyk5HPqP3hUSFvNVneLKYY611TR6W
PTNlclQtgaDqw+34IL9fzLdwALduO/ZelN7kIJ+m74uyA+eitRY8kc607TkC53wl
ikfmZW4/RvTZ8M6UK+5UzhK8jCdLuMGYL6KvzXGRSgi3yLgjewQtCPkIVz6D2QQz
CkcheAmCJ8MqyJu5zlzyZMjAvnnAT45tRAxekrsu94sQ4egdRCnbWSDtY7kh+BIm
lJNXoB1lBMEKIq4QDUOXoRgffuDghje1WrG9ML+Hbisq/yFOGwXD9RiX8F6sw6W4
avAuvDszue5L3sz85K+EC4Y/wFVDNvZo4TYXao6Z0f+lQKc0t8DQYzk1OXVu8rp2
yJMC6alLbBfODALZvYH7n7do1AZls4I9d1P4jnkDrQoxB3UqQ9hVl3LEKQ73xF1O
yK5GhDDX8oVfGKF5u+decIsH4YaTw7mP3GFxJSqv3+0lUFJoi5Lc5da149p90Ids
hCExroL1+7mryIkXPeFM5TgO9r0rvZaBFOvV2z0gp35Z0+L4WPlbuEjN/lxPFin+
HlUjr8gRsI3qfJOQFy/9rKIJR0Y/8Omwt/8oTWgy1mdeHmmjk7j1nYsvC9JSQ6Zv
MldlTTKB3zhThV1+XWYp6rjd5JW1zbVWEkLNxE7GJThEUG3szgBVGP7pSWTUTsqX
nLRbwHOoq7hHwg==
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFazCCA1OgAwIBAgIRAIIQz7DSQONZRGPgu2OCiwAwDQYJKoZIhvcNAQELBQAw
TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh
cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMTUwNjA0MTEwNDM4
WhcNMzUwNjA0MTEwNDM4WjBPMQswCQYDVQQGEwJVUzEpMCcGA1UEChMgSW50ZXJu
ZXQgU2VjdXJpdHkgUmVzZWFyY2ggR3JvdXAxFTATBgNVBAMTDElTUkcgUm9vdCBY
MTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAK3oJHP0FDfzm54rVygc
h77ct984kIxuPOZXoHj3dcKi/vVqbvYATyjb3miGbESTtrFj/RQSa78f0uoxmyF+
0TM8ukj13Xnfs7j/EvEhmkvBioZxaUpmZmyPfjxwv60pIgbz5MDmgK7iS4+3mX6U
A5/TR5d8mUgjU+g4rk8Kb4Mu0UlXjIB0ttov0DiNewNwIRt18jA8+o+u3dpjq+sW
T8KOEUt+zwvo/7V3LvSye0rgTBIlDHCNAymg4VMk7BPZ7hm/ELNKjD+Jo2FR3qyH
B5T0Y3HsLuJvW5iB4YlcNHlsdu87kGJ55tukmi8mxdAQ4Q7e2RCOFvu396j3x+UC
B5iPNgiV5+I3lg02dZ77DnKxHZu8A/lJBdiB3QW0KtZB6awBdpUKD9jf1b0SHzUv
KBds0pjBqAlkd25HN7rOrFleaJ1/ctaJxQZBKT5ZPt0m9STJEadao0xAH0ahmbWn
OlFuhjuefXKnEgV4We0+UXgVCwOPjdAvBbI+e0ocS3MFEvzG6uBQE3xDk3SzynTn
jh8BCNAw1FtxNrQHusEwMFxIt4I7mKZ9YIqioymCzLq9gwQbooMDQaHWBfEbwrbw
qHyGO0aoSCqI3Haadr8faqU9GY/rOPNk3sgrDQoo//fb4hVC1CLQJ13hef4Y53CI
rU7m2Ys6xt0nUW7/vGT1M0NPAgMBAAGjQjBAMA4GA1UdDwEB/wQEAwIBBjAPBgNV
HRMBAf8EBTADAQH/MB0GA1UdDgQWBBR5tFnme7bl5AFzgAiIyBpY9umbbjANBgkq
hkiG9w0BAQsFAAOCAgEAVR9YqbyyqFDQDLHYGmkgJykIrGF1XIpu+ILlaS/V9lZL
ubhzEFnTIZd+50xx+7LSYK05qAvqFyFWhfFQDlnrzuBZ6brJFe+GnY+EgPbk6ZGQ
3BebYhtF8GaV0nxvwuo77x/Py9auJ/GpsMiu/X1+mvoiBOv/2X/qkSsisRcOj/KK
NFtY2PwByVS5uCbMiogziUwthDyC3+6WVwW6LLv3xLfHTjuCvjHIInNzktHCgKQ5
ORAzI4JMPJ+GslWYHb4phowim57iaztXOoJwTdwJx4nLCgdNbOhdjsnvzqvHu7Ur
TkXWStAmzOVyyghqpZXjFaH3pO3JLF+l+/+sKAIuvtd7u+Nxe5AW0wdeRlN8NwdC
jNPElpzVmbUq4JUagEiuTDkHzsxHpFKVK7q4+63SM1N95R1NbdWhscdCb+ZAJzVc
oyi3B43njTOQ5yOf+1CceWxG1bQVs5ZufpsMljq4Ui0/1lvh+wjChP4kqKOJ2qxq
4RgqsahDYVvTH9w7jXbyLeiNdd8XM2w9U/t7y0Ff/9yi0GE44Za4rF2LN9d11TPA
mRGunUHBcnWEvgJBQl9nJEiU0Zsnvgc/ubhPgXRR4Xq37Z0j4r7g1SgEEzwxA57d
emyPxgcYxn/eR44/KJ4EBs+lVDR3veyJm+kXQ99b21/+jh5Xos1AnX5iItreGCc=
-----END CERTIFICATE-----
`

View File

@@ -20,8 +20,7 @@ func UDPWrite(conn *net.UDPConn, dst net.Addr, mainType uint16, subType uint16,
func UDPRead(conn *net.UDPConn, timeout time.Duration) (ra net.Addr, head *openP2PHeader, result []byte, len int, err error) {
if timeout > 0 {
deadline := time.Now().Add(timeout)
err = conn.SetReadDeadline(deadline)
err = conn.SetReadDeadline(time.Now().Add(timeout))
if err != nil {
gLog.Println(LvERROR, "SetReadDeadline error")
return nil, nil, nil, 0, err

View File

@@ -77,7 +77,7 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel)
time.Sleep(ts)
}
gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port))
c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout)
if err != nil {
gLog.Println(LvDEBUG, "send tcp punch: ", err)
return nil, err
@@ -90,7 +90,7 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel)
if err != nil {
return nil, err
}
l.SetDeadline(time.Now().Add(HandshakeTimeout))
l.SetDeadline(time.Now().Add(CheckActiveTimeout))
c, err := l.Accept()
defer l.Close()
if err != nil {
@@ -103,9 +103,9 @@ func dialTCP(host string, port int, localPort int, mode string) (*underlayTCP, e
var c net.Conn
var err error
if mode == LinkModeTCPPunch {
c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout)
} else {
c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout)
}
if err != nil {

View File

@@ -5,6 +5,7 @@ import (
"archive/zip"
"compress/gzip"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
@@ -16,12 +17,22 @@ import (
"time"
)
func update(host string, port int) {
func update(host string, port int) error {
gLog.Println(LvINFO, "update start")
defer gLog.Println(LvINFO, "update end")
caCertPool, err := x509.SystemCertPool()
if err != nil {
gLog.Println(LvERROR, "Failed to load system root CAs:", err)
} else {
caCertPool = x509.NewCertPool()
}
caCertPool.AppendCertsFromPEM([]byte(rootCA))
caCertPool.AppendCertsFromPEM([]byte(ISRGRootX1))
c := http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
TLSClientConfig: &tls.Config{RootCAs: caCertPool,
InsecureSkipVerify: false},
},
Timeout: time.Second * 30,
}
@@ -30,33 +41,33 @@ func update(host string, port int) {
rsp, err := c.Get(fmt.Sprintf("https://%s:%d/api/v1/update?fromver=%s&os=%s&arch=%s&user=%s&node=%s", host, port, OpenP2PVersion, goos, goarch, gConf.Network.User, gConf.Network.Node))
if err != nil {
gLog.Println(LvERROR, "update:query update list failed:", err)
return
return err
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
gLog.Println(LvERROR, "get update info error:", rsp.Status)
return
return err
}
rspBuf, err := ioutil.ReadAll(rsp.Body)
if err != nil {
gLog.Println(LvERROR, "update:read update list failed:", err)
return
return err
}
updateInfo := UpdateInfo{}
err = json.Unmarshal(rspBuf, &updateInfo)
if err != nil {
if err = json.Unmarshal(rspBuf, &updateInfo); err != nil {
gLog.Println(LvERROR, rspBuf, " update info decode error:", err)
return
return err
}
if updateInfo.Error != 0 {
gLog.Println(LvERROR, "update error:", updateInfo.Error, updateInfo.ErrorDetail)
return
return err
}
err = updateFile(updateInfo.Url, "", "openp2p")
if err != nil {
gLog.Println(LvERROR, "update: download failed:", err)
return
return err
}
return nil
}
// todo rollback on error
@@ -68,8 +79,18 @@ func updateFile(url string, checksum string, dst string) error {
gLog.Printf(LvERROR, "OpenFile %s error:%s", tmpFile, err)
return err
}
caCertPool, err := x509.SystemCertPool()
if err != nil {
gLog.Println(LvERROR, "Failed to load system root CAs:", err)
} else {
caCertPool = x509.NewCertPool()
}
caCertPool.AppendCertsFromPEM([]byte(rootCA))
caCertPool.AppendCertsFromPEM([]byte(ISRGRootX1))
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: false},
}
client := &http.Client{Transport: tr}
response, err := client.Get(url)
@@ -90,9 +111,13 @@ func updateFile(url string, checksum string, dst string) error {
gLog.Println(LvINFO, "download ", url, " ok")
gLog.Printf(LvINFO, "size: %d bytes", n)
err = os.Rename(os.Args[0], os.Args[0]+"0")
if err != nil && os.IsExist(err) {
gLog.Printf(LvINFO, " rename %s error:%s", os.Args[0], err)
err = os.Rename(os.Args[0], os.Args[0]+"0") // the old daemon process was using the 0 file, so it will prevent override it
if err != nil {
gLog.Printf(LvINFO, " rename %s error:%s, retry 1", os.Args[0], err)
err = os.Rename(os.Args[0], os.Args[0]+"1")
if err != nil {
gLog.Printf(LvINFO, " rename %s error:%s", os.Args[0], err)
}
}
// extract
gLog.Println(LvINFO, "extract files")
@@ -191,3 +216,14 @@ func extractTgz(dst, src string) error {
}
return nil
}
func cleanTempFiles() {
err := os.Remove(os.Args[0] + "0")
if err != nil {
gLog.Printf(LvDEBUG, " remove %s error:%s", os.Args[0]+"0", err)
}
err = os.Remove(os.Args[0] + "1")
if err != nil {
gLog.Printf(LvDEBUG, " remove %s error:%s", os.Args[0]+"0", err)
}
}

View File

@@ -6,7 +6,7 @@ RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
rm -rf /tmp/* /var/tmp/* /var/cache/apk/* /var/cache/distfiles/*
COPY get-client.sh /
ARG DOCKER_VER="latest"
RUN echo $TARGETPLATFORM && chmod +x /get-client.sh && ./get-client.sh
ENTRYPOINT ["/openp2p"]

View File

@@ -1,5 +1,7 @@
#!/bin/sh
echo "Building version:${DOCKER_VER}"
echo "Running on platform: $TARGETPLATFORM"
# TARGETPLATFORM=$(echo $TARGETPLATFORM | tr ',' '/')
echo "Running on platform: $TARGETPLATFORM"
@@ -23,7 +25,7 @@ sysType="linux-amd64"
sysType="linux-mipsbe"
fi
fi
url="https://openp2p.cn/download/v1/latest/openp2p-latest.$sysType.tar.gz"
url="https://openp2p.cn/download/v1/${DOCKER_VER}/openp2p-latest.$sysType.tar.gz"
echo "download $url start"
if [ -f /usr/bin/curl ]; then

1
go.mod
View File

@@ -3,6 +3,7 @@ module openp2p
go 1.18
require (
github.com/emirpasic/gods v1.18.1
github.com/gorilla/websocket v1.4.2
github.com/openp2p-cn/go-reuseport v0.3.2
github.com/openp2p-cn/service v1.0.0