Compare commits

..

14 Commits

Author SHA1 Message Date
TenderIronh
e21adebc26 3.10.3 2023-08-09 22:52:29 +08:00
OpenP2P
b2a7619bd6 Merge pull request #43 from W192547975/fixbug
Bug fixing from 2023/8/1
2023-08-08 16:50:25 +08:00
W192547975
fe4022ba6c Update openp2p.go 2023-08-06 23:38:14 +08:00
W192547975
82c74b4f85 Update config.go 2023-08-06 23:31:17 +08:00
TenderIronh
0af65b7204 tls verify server and support docker 2023-08-03 23:05:45 +08:00
TenderIronh
46b4f78010 optimize tcp and udp punch 2023-07-29 20:36:35 +08:00
TenderIronh
8ebdf3341e 3.9.1 2023-07-21 22:25:33 +08:00
TenderIronh
b667e5b766 3.8.0 2023-05-07 20:42:06 +08:00
TenderIronh
cd415e7bf4 3.6.11 2023-03-25 12:00:27 +08:00
TenderIronh
67e3a8915a 3.6.8 2023-03-22 23:11:38 +08:00
TenderIronh
791d910314 3.6.5 2023-03-05 00:44:22 +08:00
TenderIronh
c3a43be3cc improve gatway and p2papp reconnect 2022-11-25 23:55:33 +08:00
TenderIronh
c8b8bf05a5 support openwrt and improve app and gateway reconnect time 2022-11-18 23:19:47 +08:00
hhd
8311341960 readloop error 2022-11-10 23:34:04 +08:00
38 changed files with 1314 additions and 655 deletions

View File

@@ -101,6 +101,22 @@ cd到代码根目录执行
```
make
```
手动编译特定系统和架构
All GOOS values:
```
"aix", "android", "darwin", "dragonfly", "freebsd", "hurd", "illumos", "ios", "js", "linux", "nacl", "netbsd", "openbsd", "plan9", "solaris", "windows", "zos"
```
All GOARCH values:
```
"386", "amd64", "amd64p32", "arm", "arm64", "arm64be", "armbe", "loong64", "mips", "mips64", "mips64le", "mips64p32", "mips64p32le", "mipsle", "ppc", "ppc64", "ppc64le", "riscv", "riscv64", "s390", "s390x", "sparc", "sparc64", "wasm"
```
比如linux+amd64
```
export GOPROXY=https://goproxy.io,direct
go mod tidy
CGO_ENABLED=0 env GOOS=linux GOARCH=amd64 go build -o openp2p --ldflags '-s -w ' -gcflags '-l' -p 8 -installsuffix cgo ./cmd
```
## RoadMap
近期计划:
@@ -110,12 +126,12 @@ make
4. ~~建立网站用户可以在网站管理所有P2PApp和设备。查看设备在线状态升级增删查改重启P2PApp等~~(100%)
5. 建立公众号用户可在微信公众号管理所有P2PApp和设备
6. 客户端提供WebUI
7. 支持自有服务器,开源服务器程序
7. ~~支持自有服务器,开源服务器程序~~(100%)
8. 共享节点调度模型优化,对不同的运营商优化
9. 方便二次开发提供API和lib
10. 应用层支持UDP协议实现很简单但UDP应用较少暂不急(100%)
10. ~~应用层支持UDP协议实现很简单但UDP应用较少暂不急~~(100%)
11. 底层通信支持KCP协议目前仅支持QuicKCP专门对延时优化被游戏加速器广泛使用可以牺牲一定的带宽降低延时
12. 支持Android系统让旧手机焕发青春变成移动网关
12. ~~支持Android系统让旧手机焕发青春变成移动网关~~(100%)
13. 支持Windows网上邻居共享文件
14. 内网直连优化,用处不大,估计就用户测试时用到
15. ~~支持UPNP~~(100%)

View File

@@ -109,6 +109,23 @@ cd root directory of the socure code and execute
make
```
build specified os and arch.
All GOOS values:
```
"aix", "android", "darwin", "dragonfly", "freebsd", "hurd", "illumos", "ios", "js", "linux", "nacl", "netbsd", "openbsd", "plan9", "solaris", "windows", "zos"
```
All GOARCH values:
```
"386", "amd64", "amd64p32", "arm", "arm64", "arm64be", "armbe", "loong64", "mips", "mips64", "mips64le", "mips64p32", "mips64p32le", "mipsle", "ppc", "ppc64", "ppc64le", "riscv", "riscv64", "s390", "s390x", "sparc", "sparc64", "wasm"
```
For example linux+amd64
```
export GOPROXY=https://goproxy.io,direct
go mod tidy
CGO_ENABLED=0 env GOOS=linux GOARCH=amd64 go build -o openp2p --ldflags '-s -w ' -gcflags '-l' -p 8 -installsuffix cgo ./cmd
```
## RoadMap
Short-Term:
1. ~~Support IPv6.~~(100%)
@@ -117,12 +134,12 @@ Short-Term:
4. ~~Build website, users can manage all P2PApp and devices via it. View devices' online status, upgrade, restart or CURD P2PApp .~~(100%)
5. Provide wechat official account, user can manage P2PApp nodes and deivce as same as website.
6. Provide WebUI on client side.
7. Support private server, open source server program.
7. ~~Support private server, open source server program.~~(100%)
8. Optimize our share scheduling model for different network operators.
9. Provide REST APIs and libary for secondary development.
10. ~~Support UDP at application layer, it is easy to implement but not urgent due to only a few applicaitons using UDP protocol.~~(100%)
11. Support KCP protocol underlay, currently support Quic only. KCP focus on delay optimization,which has been widely used as game accelerator,it can sacrifice part of bandwidth to reduce timelag.
12. Support Android platform, let the phones to be mobile gateway.
12. ~~Support Android platform, let the phones to be mobile gateway.~~(100%)
13. Support SMB Windows neighborhood.
14. Direct connection on intranet, for testing.
15. ~~Support UPNP.~~(100%)

View File

@@ -96,4 +96,12 @@ firewall-cmd --state
C:\Program Files\OpenP2P\openp2p.exe uninstall
# linux,macos
sudo /usr/local/openp2p/openp2p uninstall
```
```
## Docker运行
```
# 把YOUR-TOKEN和YOUR-NODE-NAME替换成自己的
docker run -d --net host --name openp2p-client -e OPENP2P_TOKEN=YOUR-TOKEN -e OPENP2P_NODE=YOUR-NODE-NAME openp2pcn/openp2p-client:latest
OR
docker run -d --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME
```

View File

@@ -98,4 +98,12 @@ firewall-cmd --state
C:\Program Files\OpenP2P\openp2p.exe uninstall
# linux,macos
sudo /usr/local/openp2p/openp2p uninstall
```
## Run with Docker
```
# Replace YOUR-TOKEN and YOUR-NODE-NAME with yours
docker run -d --net host --name openp2p-client -e OPENP2P_TOKEN=YOUR-TOKEN -e OPENP2P_NODE=YOUR-NODE-NAME openp2pcn/openp2p-client:latest
OR
docker run -d --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME
```

17
app/README.md Normal file
View File

@@ -0,0 +1,17 @@
## Build
```
cd core
go get -v golang.org/x/mobile/bind
gomobile bind -target android -v
if [[ $? -ne 0 ]]; then
echo "build error"
exit 9
fi
echo "build ok"
cp openp2p.aar openp2p-sources.jar ../app/app/libs
echo "copy to APP libs"
cd ../app
./gradlew build
```

View File

@@ -1,7 +1,9 @@
package main
import openp2p "openp2p/core"
import (
core "openp2p/core"
)
func main() {
openp2p.Run()
core.Run()
}

View File

@@ -20,7 +20,6 @@ import (
const MinNodeNameLen = 8
func getmac(ip string) string {
//get mac relative to the ip address which connected to the mq.
ifaces, err := net.Interfaces()
if err != nil {
return ""
@@ -138,8 +137,7 @@ func netInfo() *NetInfo {
continue
}
rsp := NetInfo{}
err = json.Unmarshal(buf[:n], &rsp)
if err != nil {
if err = json.Unmarshal(buf[:n], &rsp); err != nil {
gLog.Printf(LvERROR, "wrong NetInfo:%s", err)
continue
}

View File

@@ -3,8 +3,10 @@ package openp2p
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"strconv"
"sync"
"time"
)
@@ -13,14 +15,15 @@ var gConf Config
type AppConfig struct {
// required
AppName string
Protocol string
SrcPort int
PeerNode string
DstPort int
DstHost string
PeerUser string
Enabled int // default:1
AppName string
Protocol string
Whitelist string
SrcPort int
PeerNode string
DstPort int
DstHost string
PeerUser string
Enabled int // default:1
// runtime info
peerVersion string
peerToken uint64
@@ -41,6 +44,10 @@ type AppConfig struct {
isUnderlayServer int // TODO: bool?
}
func (c *AppConfig) ID() string {
return fmt.Sprintf("%s%d", c.Protocol, c.SrcPort)
}
// TODO: add loglevel, maxlogfilesize
type Config struct {
Network NetworkConfig `json:"network"`
@@ -58,7 +65,18 @@ func (c *Config) switchApp(app AppConfig, enabled int) {
c.Apps[i].Enabled = enabled
c.Apps[i].retryNum = 0
c.Apps[i].nextRetryTime = time.Now()
return
break
}
}
c.save()
}
func (c *Config) retryApp(peerNode string) {
c.mtx.Lock()
defer c.mtx.Unlock()
for i := 0; i < len(c.Apps); i++ {
if c.Apps[i].PeerNode == peerNode {
c.Apps[i].retryNum = 0
c.Apps[i].nextRetryTime = time.Now()
}
}
}
@@ -66,6 +84,7 @@ func (c *Config) switchApp(app AppConfig, enabled int) {
func (c *Config) add(app AppConfig, override bool) {
c.mtx.Lock()
defer c.mtx.Unlock()
defer c.save()
if app.SrcPort == 0 || app.DstPort == 0 {
gLog.Println(LvERROR, "invalid app ", app)
return
@@ -87,17 +106,19 @@ func (c *Config) delete(app AppConfig) {
}
c.mtx.Lock()
defer c.mtx.Unlock()
defer c.save()
for i := 0; i < len(c.Apps); i++ {
if c.Apps[i].Protocol == app.Protocol && c.Apps[i].SrcPort == app.SrcPort {
c.Apps = append(c.Apps[:i], c.Apps[i+1:]...)
return
}
}
}
func (c *Config) save() {
c.mtx.Lock()
defer c.mtx.Unlock()
// c.mtx.Lock()
// defer c.mtx.Unlock() // internal call
data, _ := json.MarshalIndent(c, "", " ")
err := ioutil.WriteFile("config.json", data, 0644)
if err != nil {
@@ -128,24 +149,31 @@ func (c *Config) load() error {
return err
}
// TODO: deal with multi-thread r/w
func (c *Config) setToken(token uint64) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.Network.Token = token
defer c.save()
if token != 0 {
c.Network.Token = token
}
}
func (c *Config) setUser(user string) {
c.mtx.Lock()
defer c.mtx.Unlock()
defer c.save()
c.Network.User = user
}
func (c *Config) setNode(node string) {
c.mtx.Lock()
defer c.mtx.Unlock()
defer c.save()
c.Network.Node = node
}
func (c *Config) setShareBandwidth(bw int) {
c.mtx.Lock()
defer c.mtx.Unlock()
defer c.save()
c.Network.ShareBandwidth = bw
}
@@ -180,6 +208,7 @@ func parseParams(subCommand string) {
node := fset.String("node", "", "node name. 8-31 characters. if not set, it will be hostname")
peerNode := fset.String("peernode", "", "peer node name that you want to connect")
dstIP := fset.String("dstip", "127.0.0.1", "destination ip ")
whiteList := fset.String("whitelist", "", "whitelist for p2pApp ")
dstPort := fset.Int("dstport", 0, "destination port ")
srcPort := fset.Int("srcport", 0, "source port ")
tcpPort := fset.Int("tcpport", 0, "tcp port for upnp or publicip")
@@ -189,7 +218,7 @@ func parseParams(subCommand string) {
daemonMode := fset.Bool("d", false, "daemonMode")
notVerbose := fset.Bool("nv", false, "not log console")
newconfig := fset.Bool("newconfig", false, "not load existing config.json")
logLevel := fset.Int("loglevel", 0, "0:info 1:warn 2:error 3:debug")
logLevel := fset.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error")
if subCommand == "" { // no subcommand
fset.Parse(os.Args[1:])
} else {
@@ -199,6 +228,7 @@ func parseParams(subCommand string) {
config := AppConfig{Enabled: 1}
config.PeerNode = *peerNode
config.DstHost = *dstIP
config.Whitelist = *whiteList
config.DstPort = *dstPort
config.SrcPort = *srcPort
config.Protocol = *protocol
@@ -229,20 +259,20 @@ func parseParams(subCommand string) {
gConf.Network.TCPPort = *tcpPort
}
if f.Name == "token" {
gConf.Network.Token = *token
gConf.setToken(*token)
}
})
// set default value
if gConf.Network.ServerHost == "" {
gConf.Network.ServerHost = *serverHost
}
if *node != "" {
if len(*node) < MinNodeNameLen {
gLog.Println(LvERROR, ErrNodeTooShort)
os.Exit(9)
}
gConf.Network.Node = *node
} else {
envNode := os.Getenv("OPENP2P_NODE")
if envNode != "" {
gConf.Network.Node = envNode
}
if gConf.Network.Node == "" { // if node name not set. use os.Hostname
gConf.Network.Node = defaultNodeName()
}
@@ -254,7 +284,14 @@ func parseParams(subCommand string) {
}
gConf.Network.TCPPort = *tcpPort
}
if *token == 0 {
envToken := os.Getenv("OPENP2P_TOKEN")
if envToken != "" {
if n, err := strconv.ParseUint(envToken, 10, 64); n != 0 && err == nil {
gConf.setToken(n)
}
}
}
gConf.Network.ServerPort = *serverPort
gConf.Network.UDPPort1 = UDPPort1
gConf.Network.UDPPort2 = UDPPort2

View File

@@ -6,7 +6,7 @@ import (
"path/filepath"
"time"
"github.com/kardianos/service"
"github.com/openp2p-cn/service"
)
type daemon struct {
@@ -44,9 +44,9 @@ func (d *daemon) run() {
}
gLog.Println(LvINFO, mydir)
conf := &service.Config{
Name: ProducnName,
DisplayName: ProducnName,
Description: ProducnName,
Name: ProductName,
DisplayName: ProductName,
Description: ProductName,
Executable: binPath,
}
@@ -95,9 +95,9 @@ func (d *daemon) run() {
func (d *daemon) Control(ctrlComm string, exeAbsPath string, args []string) error {
svcConfig := &service.Config{
Name: ProducnName,
DisplayName: ProducnName,
Description: ProducnName,
Name: ProductName,
DisplayName: ProductName,
Description: ProductName,
Executable: exeAbsPath,
Arguments: args,
}

View File

@@ -8,12 +8,14 @@ import (
var (
// ErrorS2S string = "s2s is not supported"
// ErrorHandshake string = "handshake error"
ErrorS2S = errors.New("s2s is not supported")
ErrorHandshake = errors.New("handshake error")
ErrorNewUser = errors.New("new user")
ErrorLogin = errors.New("user or password not correct")
ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters")
ErrPeerOffline = errors.New("peer offline")
ErrMsgFormat = errors.New("message format wrong")
ErrVersionNotCompatible = errors.New("version not compatible")
ErrorS2S = errors.New("s2s is not supported")
ErrorHandshake = errors.New("handshake error")
ErrorNewUser = errors.New("new user")
ErrorLogin = errors.New("user or password not correct")
ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters")
ErrPeerOffline = errors.New("peer offline")
ErrNetwork = errors.New("network error")
ErrMsgFormat = errors.New("message format wrong")
ErrVersionNotCompatible = errors.New("version not compatible")
ErrOverlayConnDisconnect = errors.New("overlay connection is disconnected")
)

View File

@@ -8,7 +8,10 @@ import (
"os"
"os/exec"
"path/filepath"
"reflect"
"time"
"github.com/openp2p-cn/totp"
)
func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
@@ -20,62 +23,10 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead)
switch subType {
case MsgPushConnectReq: // TODO: handle a msg move to a new function
req := PushConnectReq{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushConnectReq:%s", err)
return err
}
gLog.Printf(LvINFO, "%s is connecting...", req.From)
gLog.Println(LvDEBUG, "push connect response to ", req.From)
if compareVersion(req.Version, LeastSupportVersion) == LESS {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
rsp := PushConnectRsp{
Error: 10,
Detail: ErrVersionNotCompatible.Error(),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
return ErrVersionNotCompatible
}
// verify totp token or token
if VerifyTOTP(req.Token, pn.config.Token, time.Now().Unix()+(pn.serverTs-pn.localTs)) || // localTs may behind, auto adjust ts
VerifyTOTP(req.Token, pn.config.Token, time.Now().Unix()) {
gLog.Printf(LvINFO, "Access Granted\n")
config := AppConfig{}
config.peerNatType = req.NatType
config.peerConeNatPort = req.ConeNatPort
config.peerIP = req.FromIP
config.PeerNode = req.From
config.peerVersion = req.Version
config.fromToken = req.Token
config.peerIPv6 = req.IPv6
config.hasIPv4 = req.HasIPv4
config.hasUPNPorNATPMP = req.HasUPNPorNATPMP
config.linkMode = req.LinkMode
config.isUnderlayServer = req.IsUnderlayServer
// share relay node will limit bandwidth
if req.Token != pn.config.Token {
gLog.Printf(LvINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth)
config.shareBandwidth = pn.config.ShareBandwidth
}
// go pn.AddTunnel(config, req.ID)
go pn.addDirectTunnel(config, req.ID)
break
}
gLog.Println(LvERROR, "Access Denied:", req.From)
rsp := PushConnectRsp{
Error: 1,
Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
err = handleConnectReq(pn, subType, msg)
case MsgPushRsp:
rsp := PushRsp{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &rsp)
if err != nil {
if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil {
gLog.Printf(LvERROR, "wrong pushRsp:%s", err)
return err
}
@@ -86,9 +37,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}
case MsgPushAddRelayTunnelReq:
req := AddRelayTunnelReq{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
return err
}
config := AppConfig{}
@@ -101,13 +51,11 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
msg := TunnelMsg{ID: t.id}
pn.push(r.From, MsgPushAddRelayTunnelRsp, msg)
}
}(req)
case MsgPushAPPKey:
req := APPKeySync{}
err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong APPKeySync:%s", err)
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
return err
}
SaveKey(req.AppID, req.AppKey)
@@ -132,137 +80,27 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
os.Exit(0)
return err
case MsgPushReportApps:
gLog.Println(LvINFO, "MsgPushReportApps")
req := ReportApps{}
gConf.mtx.Lock()
defer gConf.mtx.Unlock()
for _, config := range gConf.Apps {
appActive := 0
relayNode := ""
relayMode := ""
linkMode := LinkModeUDPPunch
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
if ok {
app := i.(*p2pApp)
if app.isActive() {
appActive = 1
}
relayNode = app.relayNode
relayMode = app.relayMode
linkMode = app.tunnel.linkModeWeb
}
appInfo := AppInfo{
AppName: config.AppName,
Error: config.errMsg,
Protocol: config.Protocol,
SrcPort: config.SrcPort,
RelayNode: relayNode,
RelayMode: relayMode,
LinkMode: linkMode,
PeerNode: config.PeerNode,
DstHost: config.DstHost,
DstPort: config.DstPort,
PeerUser: config.PeerUser,
PeerIP: config.peerIP,
PeerNatType: config.peerNatType,
RetryTime: config.retryTime.Local().Format("2006-01-02T15:04:05-0700"),
ConnectTime: config.connectTime.Local().Format("2006-01-02T15:04:05-0700"),
IsActive: appActive,
Enabled: config.Enabled,
}
req.Apps = append(req.Apps, appInfo)
}
pn.write(MsgReport, MsgReportApps, &req)
err = handleReportApps(pn, subType, msg)
case MsgPushReportLog:
gLog.Println(LvINFO, "MsgPushReportLog")
req := ReportLogReq{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushReportLog:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
if req.FileName == "" {
req.FileName = "openp2p.log"
}
f, err := os.Open(filepath.Join("log", req.FileName))
if err != nil {
gLog.Println(LvERROR, "read log file error:", err)
break
}
fi, err := f.Stat()
if err != nil {
break
}
if req.Offset == 0 && fi.Size() > 4096 {
req.Offset = fi.Size() - 4096
}
if req.Len <= 0 {
req.Len = 4096
}
f.Seek(req.Offset, 0)
if req.Len > 1024*1024 { // too large
break
}
buff := make([]byte, req.Len)
readLength, err := f.Read(buff)
f.Close()
if err != nil {
gLog.Println(LvERROR, "read log content error:", err)
break
}
rsp := ReportLogRsp{}
rsp.Content = string(buff[:readLength])
rsp.FileName = req.FileName
rsp.Total = fi.Size()
rsp.Len = req.Len
pn.write(MsgReport, MsgPushReportLog, &rsp)
err = handleLog(pn, subType, msg)
case MsgPushEditApp:
gLog.Println(LvINFO, "MsgPushEditApp")
newApp := AppInfo{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &newApp)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushEditApp:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
oldConf := AppConfig{Enabled: 1}
// protocol0+srcPort0 exist, delApp
oldConf.AppName = newApp.AppName
oldConf.Protocol = newApp.Protocol0
oldConf.SrcPort = newApp.SrcPort0
oldConf.PeerNode = newApp.PeerNode
oldConf.DstHost = newApp.DstHost
oldConf.DstPort = newApp.DstPort
gConf.delete(oldConf)
// AddApp
newConf := oldConf
newConf.Protocol = newApp.Protocol
newConf.SrcPort = newApp.SrcPort
gConf.add(newConf, false)
gConf.save() // save quickly for the next request reportApplist
pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
// autoReconnect will auto AddApp
// pn.AddApp(config)
// TODO: report result
err = handleEditApp(pn, subType, msg)
case MsgPushEditNode:
gLog.Println(LvINFO, "MsgPushEditNode")
req := EditNode{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushEditNode:%s %s", err, string(msg[openP2PHeaderSize:]))
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
gConf.setNode(req.NewName)
gConf.setShareBandwidth(req.Bandwidth)
gConf.save()
// TODO: hot reload
os.Exit(0)
case MsgPushSwitchApp:
gLog.Println(LvINFO, "MsgPushSwitchApp")
app := AppInfo{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &app)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushSwitchApp:%s %s", err, string(msg[openP2PHeaderSize:]))
if err = json.Unmarshal(msg[openP2PHeaderSize:], &app); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(app), err, string(msg[openP2PHeaderSize:]))
return err
}
config := AppConfig{Enabled: app.Enabled, SrcPort: app.SrcPort, Protocol: app.Protocol}
@@ -272,11 +110,197 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
// disable APP
pn.DeleteApp(config)
}
case MsgPushDstNodeOnline:
gLog.Println(LvINFO, "MsgPushDstNodeOnline")
req := PushDstNodeOnline{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
gLog.Println(LvINFO, "retry peerNode ", req.Node)
gConf.retryApp(req.Node)
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[pushHead.From]
pn.msgMapMtx.Unlock()
ch <- msg
}
return nil
return err
}
func handleEditApp(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvINFO, "MsgPushEditApp")
newApp := AppInfo{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &newApp); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(newApp), err, string(msg[openP2PHeaderSize:]))
return err
}
oldConf := AppConfig{Enabled: 1}
// protocol0+srcPort0 exist, delApp
oldConf.AppName = newApp.AppName
oldConf.Protocol = newApp.Protocol0
oldConf.Whitelist = newApp.Whitelist
oldConf.SrcPort = newApp.SrcPort0
oldConf.PeerNode = newApp.PeerNode
oldConf.DstHost = newApp.DstHost
oldConf.DstPort = newApp.DstPort
gConf.delete(oldConf)
// AddApp
newConf := oldConf
newConf.Protocol = newApp.Protocol
newConf.SrcPort = newApp.SrcPort
gConf.add(newConf, false)
pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
return nil
// autoReconnect will auto AddApp
// pn.AddApp(config)
// TODO: report result
}
func handleConnectReq(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
req := PushConnectReq{}
if err = json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
return err
}
gLog.Printf(LvDEBUG, "%s is connecting...", req.From)
gLog.Println(LvDEBUG, "push connect response to ", req.From)
if compareVersion(req.Version, LeastSupportVersion) == LESS {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
rsp := PushConnectRsp{
Error: 10,
Detail: ErrVersionNotCompatible.Error(),
To: req.From,
From: pn.config.Node,
}
pn.push(req.From, MsgPushConnectRsp, rsp)
return ErrVersionNotCompatible
}
// verify totp token or token
t := totp.TOTP{Step: totp.RelayTOTPStep}
if t.Verify(req.Token, pn.config.Token, time.Now().Unix()-pn.dt/int64(time.Second)) { // localTs may behind, auto adjust ts
gLog.Printf(LvINFO, "Access Granted\n")
config := AppConfig{}
config.peerNatType = req.NatType
config.peerConeNatPort = req.ConeNatPort
config.peerIP = req.FromIP
config.PeerNode = req.From
config.peerVersion = req.Version
config.fromToken = req.Token
config.peerIPv6 = req.IPv6
config.hasIPv4 = req.HasIPv4
config.hasUPNPorNATPMP = req.HasUPNPorNATPMP
config.linkMode = req.LinkMode
config.isUnderlayServer = req.IsUnderlayServer
// share relay node will limit bandwidth
if req.Token != pn.config.Token {
gLog.Printf(LvINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth)
config.shareBandwidth = pn.config.ShareBandwidth
}
// go pn.AddTunnel(config, req.ID)
go pn.addDirectTunnel(config, req.ID)
return nil
}
gLog.Println(LvERROR, "Access Denied:", req.From)
rsp := PushConnectRsp{
Error: 1,
Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node),
To: req.From,
From: pn.config.Node,
}
return pn.push(req.From, MsgPushConnectRsp, rsp)
}
func handleReportApps(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvINFO, "MsgPushReportApps")
req := ReportApps{}
gConf.mtx.Lock()
defer gConf.mtx.Unlock()
for _, config := range gConf.Apps {
appActive := 0
relayNode := ""
relayMode := ""
linkMode := LinkModeUDPPunch
i, ok := pn.apps.Load(config.ID())
if ok {
app := i.(*p2pApp)
if app.isActive() {
appActive = 1
}
relayNode = app.relayNode
relayMode = app.relayMode
linkMode = app.tunnel.linkModeWeb
}
appInfo := AppInfo{
AppName: config.AppName,
Error: config.errMsg,
Protocol: config.Protocol,
Whitelist: config.Whitelist,
SrcPort: config.SrcPort,
RelayNode: relayNode,
RelayMode: relayMode,
LinkMode: linkMode,
PeerNode: config.PeerNode,
DstHost: config.DstHost,
DstPort: config.DstPort,
PeerUser: config.PeerUser,
PeerIP: config.peerIP,
PeerNatType: config.peerNatType,
RetryTime: config.retryTime.Local().Format("2006-01-02T15:04:05-0700"),
ConnectTime: config.connectTime.Local().Format("2006-01-02T15:04:05-0700"),
IsActive: appActive,
Enabled: config.Enabled,
}
req.Apps = append(req.Apps, appInfo)
}
return pn.write(MsgReport, MsgReportApps, &req)
}
func handleLog(pn *P2PNetwork, subType uint16, msg []byte) (err error) {
gLog.Println(LvDEBUG, "MsgPushReportLog")
const defaultLen = 1024 * 128
const maxLen = 1024 * 1024
req := ReportLogReq{}
if err = json.Unmarshal(msg[openP2PHeaderSize:], &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s %s", reflect.TypeOf(req), err, string(msg[openP2PHeaderSize:]))
return err
}
if req.FileName == "" {
req.FileName = "openp2p.log"
}
f, err := os.Open(filepath.Join("log", req.FileName))
if err != nil {
gLog.Println(LvERROR, "read log file error:", err)
return err
}
fi, err := f.Stat()
if err != nil {
return err
}
if req.Offset > fi.Size() {
req.Offset = fi.Size() - defaultLen
}
// verify input parameters
if req.Offset < 0 {
req.Offset = 0
}
if req.Len <= 0 || req.Len > maxLen {
req.Len = defaultLen
}
f.Seek(req.Offset, 0)
buff := make([]byte, req.Len)
readLength, err := f.Read(buff)
f.Close()
if err != nil {
gLog.Println(LvERROR, "read log content error:", err)
return err
}
rsp := ReportLogRsp{}
rsp.Content = string(buff[:readLength])
rsp.FileName = req.FileName
rsp.Total = fi.Size()
rsp.Len = req.Len
return pn.write(MsgReport, MsgPushReportLog, &rsp)
}

View File

@@ -23,11 +23,11 @@ func handshakeC2C(t *P2PTunnel) (err error) {
gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshake error:", err)
return err
}
ra, head, _, _, err := UDPRead(conn, 5000)
ra, head, _, _, err := UDPRead(conn, HandshakeTimeout)
if err != nil {
time.Sleep(time.Millisecond * 200)
gLog.Println(LvDEBUG, err, ", return this error when ip was not reachable, retry read")
ra, head, _, _, err = UDPRead(conn, 5000)
ra, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err)
return err
@@ -38,7 +38,7 @@ func handshakeC2C(t *P2PTunnel) (err error) {
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
gLog.Printf(LvDEBUG, "read %d handshake ", t.id)
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
_, head, _, _, err = UDPRead(conn, 5000)
_, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err)
return err
@@ -66,7 +66,7 @@ func handshakeC2S(t *P2PTunnel) error {
gLog.Printf(LvDEBUG, "handshakeC2S start")
defer gLog.Printf(LvDEBUG, "handshakeC2S end")
// even if read timeout, continue handshake
t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, SymmetricHandshakeAckTimeout)
t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, HandshakeTimeout)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randPorts := r.Perm(65532)
conn, err := net.ListenUDP("udp", t.la)
@@ -92,7 +92,7 @@ func handshakeC2S(t *P2PTunnel) error {
gLog.Println(LvDEBUG, "send symmetric handshake end")
return nil
}()
deadline := time.Now().Add(SymmetricHandshakeAckTimeout)
deadline := time.Now().Add(HandshakeTimeout)
err = conn.SetReadDeadline(deadline)
if err != nil {
gLog.Println(LvERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error")
@@ -140,7 +140,7 @@ func handshakeS2C(t *P2PTunnel) error {
}
defer conn.Close()
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id})
_, head, _, _, err := UDPRead(conn, 10000)
_, head, _, _, err := UDPRead(conn, HandshakeTimeout)
if err != nil {
// gLog.Println(LevelDEBUG, "one of the handshake error:", err)
return err
@@ -155,7 +155,7 @@ func handshakeS2C(t *P2PTunnel) error {
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ", t.id)
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
_, head, _, _, err = UDPRead(conn, 5000)
_, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "handshakeS2C handshake error")
return err
@@ -174,7 +174,7 @@ func handshakeS2C(t *P2PTunnel) error {
t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id})
select {
case <-time.After(SymmetricHandshakeAckTimeout):
case <-time.After(HandshakeTimeout):
return fmt.Errorf("wait handshake failed")
case la := <-gotCh:
gLog.Println(LvDEBUG, "symmetric handshake ok", la)

View File

@@ -17,7 +17,7 @@ import (
// ./openp2p install -node hhd1207-222 -token YOUR-TOKEN -sharebandwidth 0 -peernode hhdhome-n1 -dstip 127.0.0.1 -dstport 50022 -protocol tcp -srcport 22
func install() {
gLog.Println(LvINFO, "openp2p start. version: ", OpenP2PVersion)
gLog.Println(LvINFO, "Contact: QQ: 16947733, Email: openp2p.cn@gmail.com")
gLog.Println(LvINFO, "Contact: QQ group 16947733, Email openp2p.cn@gmail.com")
gLog.Println(LvINFO, "install start")
defer gLog.Println(LvINFO, "install end")
// auto uninstall
@@ -74,6 +74,7 @@ func install() {
} else {
gLog.Println(LvINFO, "start openp2p service ok.")
}
gLog.Println(LvINFO, "Visit WebUI on https://console.openp2p.cn")
}
func installByFilename() {

155
core/iptree.go Normal file
View File

@@ -0,0 +1,155 @@
package openp2p
import (
"bytes"
"encoding/binary"
"fmt"
"log"
"net"
"strings"
"sync"
"github.com/emirpasic/gods/trees/avltree"
"github.com/emirpasic/gods/utils"
)
type IPTree struct {
tree *avltree.Tree
treeMtx sync.RWMutex
}
// add 120k cost 0.5s
func (iptree *IPTree) AddIntIP(minIP uint32, maxIP uint32) bool {
if minIP > maxIP {
return false
}
iptree.treeMtx.Lock()
defer iptree.treeMtx.Unlock()
newMinIP := minIP
newMaxIP := maxIP
cur := iptree.tree.Root
for {
if cur == nil {
break
}
curMaxIP := cur.Value.(uint32)
curMinIP := cur.Key.(uint32)
// newNode all in existNode, treat as inserted.
if newMinIP >= curMinIP && newMaxIP <= curMaxIP {
return true
}
// has no interset
if newMinIP > curMaxIP {
cur = cur.Children[1]
continue
}
if newMaxIP < curMinIP {
cur = cur.Children[0]
continue
}
// has interset, rm it and Add the new merged ip segment
iptree.tree.Remove(curMinIP)
if curMinIP < newMinIP {
newMinIP = curMinIP
}
if curMaxIP > newMaxIP {
newMaxIP = curMaxIP
}
cur = iptree.tree.Root
}
// put in the tree
iptree.tree.Put(newMinIP, newMaxIP)
return true
}
func (iptree *IPTree) Add(minIPStr string, maxIPStr string) bool {
var minIP, maxIP uint32
binary.Read(bytes.NewBuffer(net.ParseIP(minIPStr).To4()), binary.BigEndian, &minIP)
binary.Read(bytes.NewBuffer(net.ParseIP(maxIPStr).To4()), binary.BigEndian, &maxIP)
return iptree.AddIntIP(minIP, maxIP)
}
func (iptree *IPTree) Contains(ipStr string) bool {
var ip uint32
binary.Read(bytes.NewBuffer(net.ParseIP(ipStr).To4()), binary.BigEndian, &ip)
return iptree.ContainsInt(ip)
}
func (iptree *IPTree) ContainsInt(ip uint32) bool {
iptree.treeMtx.RLock()
defer iptree.treeMtx.RUnlock()
if iptree.tree == nil {
return false
}
n := iptree.tree.Root
for n != nil {
curMaxIP := n.Value.(uint32)
curMinIP := n.Key.(uint32)
switch {
case ip >= curMinIP && ip <= curMaxIP: // hit
return true
case ip < curMinIP:
n = n.Children[0]
default:
n = n.Children[1]
}
}
return false
}
func (iptree *IPTree) Size() int {
iptree.treeMtx.RLock()
defer iptree.treeMtx.RUnlock()
return iptree.tree.Size()
}
func (iptree *IPTree) Print() {
iptree.treeMtx.RLock()
defer iptree.treeMtx.RUnlock()
log.Println("size:", iptree.Size())
log.Println(iptree.tree.String())
}
func (iptree *IPTree) Clear() {
iptree.treeMtx.Lock()
defer iptree.treeMtx.Unlock()
iptree.tree.Clear()
}
// input format 127.0.0.1,192.168.1.0/24,10.1.1.30-10.1.1.50
// 127.0.0.1
// 192.168.1.0/24
// 192.168.1.1-192.168.1.10
func NewIPTree(ips string) *IPTree {
iptree := &IPTree{
tree: avltree.NewWith(utils.UInt32Comparator),
}
ipArr := strings.Split(ips, ",")
for _, ip := range ipArr {
if strings.Contains(ip, "/") { // x.x.x.x/24
_, ipNet, err := net.ParseCIDR(ip)
if err != nil {
fmt.Println("Error parsing CIDR:", err)
continue
}
minIP := ipNet.IP.Mask(ipNet.Mask).String()
maxIP := calculateMaxIP(ipNet).String()
iptree.Add(minIP, maxIP)
} else if strings.Contains(ip, "-") { // x.x.x.x-y.y.y.y
minAndMax := strings.Split(ip, "-")
iptree.Add(minAndMax[0], minAndMax[1])
} else { // single ip
iptree.Add(ip, ip)
}
}
return iptree
}
func calculateMaxIP(ipNet *net.IPNet) net.IP {
maxIP := make(net.IP, len(ipNet.IP))
copy(maxIP, ipNet.IP)
for i := range maxIP {
maxIP[i] |= ^ipNet.Mask[i]
}
return maxIP
}

171
core/iptree_test.go Normal file
View File

@@ -0,0 +1,171 @@
package openp2p
import (
"bytes"
"encoding/binary"
"net"
"testing"
)
func wrapTestContains(t *testing.T, iptree *IPTree, ip string, result bool) {
if iptree.Contains(ip) == result {
// t.Logf("compare version %s %s ok\n", v1, v2)
} else {
t.Errorf("test %s fail\n", ip)
}
}
func wrapBenchmarkContains(t *testing.B, iptree *IPTree, ip string, result bool) {
if iptree.Contains(ip) == result {
// t.Logf("compare version %s %s ok\n", v1, v2)
} else {
t.Errorf("test %s fail\n", ip)
}
}
func TestAllInputFormat(t *testing.T) {
iptree := NewIPTree("219.137.185.70,127.0.0.1,127.0.0.0/8,192.168.1.0/24,192.168.3.100-192.168.3.255,192.168.100.0-192.168.200.255")
wrapTestContains(t, iptree, "127.0.0.1", true)
wrapTestContains(t, iptree, "127.0.0.2", true)
wrapTestContains(t, iptree, "127.1.1.1", true)
wrapTestContains(t, iptree, "219.137.185.70", true)
wrapTestContains(t, iptree, "219.137.185.71", false)
wrapTestContains(t, iptree, "192.168.1.2", true)
wrapTestContains(t, iptree, "192.168.2.2", false)
wrapTestContains(t, iptree, "192.168.3.1", false)
wrapTestContains(t, iptree, "192.168.3.100", true)
wrapTestContains(t, iptree, "192.168.3.255", true)
wrapTestContains(t, iptree, "192.168.150.1", true)
wrapTestContains(t, iptree, "192.168.250.1", false)
}
func TestSingleIP(t *testing.T) {
iptree := NewIPTree("")
iptree.Add("219.137.185.70", "219.137.185.70")
wrapTestContains(t, iptree, "219.137.185.70", true)
wrapTestContains(t, iptree, "219.137.185.71", false)
}
func TestWrongSegment(t *testing.T) {
iptree := NewIPTree("")
inserted := iptree.Add("87.251.75.0", "82.251.75.255")
if inserted {
t.Errorf("TestWrongSegment failed\n")
}
}
func TestSegment2(t *testing.T) {
iptree := NewIPTree("")
iptree.Clear()
iptree.Add("10.1.5.50", "10.1.5.100")
iptree.Add("10.1.1.50", "10.1.1.100")
iptree.Add("10.1.2.50", "10.1.2.100")
iptree.Add("10.1.6.50", "10.1.6.100")
iptree.Add("10.1.7.50", "10.1.7.100")
iptree.Add("10.1.3.50", "10.1.3.100")
iptree.Add("10.1.1.1", "10.1.1.10") // no interset
iptree.Add("10.1.1.200", "10.1.1.250") // no interset
iptree.Print()
iptree.Add("10.1.1.80", "10.1.1.90") // all in
iptree.Add("10.1.1.40", "10.1.1.60") // interset
iptree.Print()
iptree.Add("10.1.1.90", "10.1.1.110") // interset
iptree.Print()
t.Logf("blocklist size:%d\n", iptree.Size())
wrapTestContains(t, iptree, "10.1.1.40", true)
wrapTestContains(t, iptree, "10.1.5.50", true)
wrapTestContains(t, iptree, "10.1.6.50", true)
wrapTestContains(t, iptree, "10.1.7.50", true)
wrapTestContains(t, iptree, "10.1.2.50", true)
wrapTestContains(t, iptree, "10.1.3.50", true)
wrapTestContains(t, iptree, "10.1.1.60", true)
wrapTestContains(t, iptree, "10.1.1.90", true)
wrapTestContains(t, iptree, "10.1.1.110", true)
wrapTestContains(t, iptree, "10.1.1.250", true)
wrapTestContains(t, iptree, "10.1.2.60", true)
wrapTestContains(t, iptree, "10.1.100.30", false)
wrapTestContains(t, iptree, "10.1.200.30", false)
iptree.Add("10.0.0.0", "10.255.255.255") // will merge all segment
iptree.Print()
if iptree.Size() != 1 {
t.Errorf("merge ip segment error\n")
}
}
func BenchmarkBuildBlockList20k(t *testing.B) {
iptree := NewIPTree("")
iptree.Clear()
iptree.Add("10.1.5.50", "10.1.5.100")
iptree.Add("10.1.1.50", "10.1.1.100")
iptree.Add("10.1.2.50", "10.1.2.100")
iptree.Add("10.1.6.50", "10.1.6.100")
iptree.Add("10.1.7.50", "10.1.7.100")
iptree.Add("10.1.3.50", "10.1.3.100")
iptree.Add("10.1.1.1", "10.1.1.10") // no interset
iptree.Add("10.1.1.200", "10.1.1.250") // no interset
iptree.Add("10.1.1.80", "10.1.1.90") // all in
iptree.Add("10.1.1.40", "10.1.1.60") // interset
iptree.Add("10.1.1.90", "10.1.1.110") // interset
var minIP uint32
binary.Read(bytes.NewBuffer(net.ParseIP("10.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 10k block ip single
nodeNum := uint32(10000 * 1)
gap := uint32(10)
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i)
// t.Logf("blocklist size:%d\n", iptree.Size())
}
binary.Read(bytes.NewBuffer(net.ParseIP("100.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 100k block ip segment
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i+5)
}
t.Logf("blocklist size:%d\n", iptree.Size())
iptree.Clear()
t.Logf("clear. blocklist size:%d\n", iptree.Size())
}
func BenchmarkQuery(t *testing.B) {
iptree := NewIPTree("")
iptree.Clear()
iptree.Add("10.1.5.50", "10.1.5.100")
iptree.Add("10.1.1.50", "10.1.1.100")
iptree.Add("10.1.2.50", "10.1.2.100")
iptree.Add("10.1.6.50", "10.1.6.100")
iptree.Add("10.1.7.50", "10.1.7.100")
iptree.Add("10.1.3.50", "10.1.3.100")
iptree.Add("10.1.1.1", "10.1.1.10") // no interset
iptree.Add("10.1.1.200", "10.1.1.250") // no interset
iptree.Add("10.1.1.80", "10.1.1.90") // all in
iptree.Add("10.1.1.40", "10.1.1.60") // interset
iptree.Add("10.1.1.90", "10.1.1.110") // interset
var minIP uint32
binary.Read(bytes.NewBuffer(net.ParseIP("10.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 10k block ip single
nodeNum := uint32(10000 * 100)
gap := uint32(10)
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i)
// t.Logf("blocklist size:%d\n", iptree.Size())
}
binary.Read(bytes.NewBuffer(net.ParseIP("100.1.1.1").To4()), binary.BigEndian, &minIP)
// insert 100k block ip segment
for i := minIP; i < minIP+nodeNum*gap; i += gap {
iptree.AddIntIP(i, i+5)
}
t.Logf("blocklist size:%d\n", iptree.Size())
t.ResetTimer()
queryNum := 100 * 10000
for i := 0; i < queryNum; i++ {
iptree.ContainsInt(minIP + uint32(i))
wrapBenchmarkContains(t, iptree, "10.1.5.55", true)
wrapBenchmarkContains(t, iptree, "10.1.1.1", true)
wrapBenchmarkContains(t, iptree, "10.1.5.200", false)
wrapBenchmarkContains(t, iptree, "200.1.1.1", false)
}
t.Logf("query list:%d\n", queryNum*4)
}

View File

@@ -51,6 +51,7 @@ type logger struct {
pid int
maxLogSize int64
mode int
stdLogger *log.Logger
}
func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64, mode int) *logger {
@@ -67,13 +68,13 @@ func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64,
os.MkdirAll(logdir, 0777)
for lv := range logFileNames {
logFilePath := logdir + filePrefix + logFileNames[lv]
f, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
f, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Fatal(err)
}
os.Chmod(logFilePath, 0666)
os.Chmod(logFilePath, 0644)
logfiles[lv] = f
loggers[lv] = log.New(f, "", log.LstdFlags)
loggers[lv] = log.New(f, "", log.LstdFlags|log.Lmicroseconds)
}
var le string
if runtime.GOOS == "windows" {
@@ -81,7 +82,8 @@ func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64,
} else {
le = "\n"
}
pLog := &logger{loggers, logfiles, level, logdir, &sync.Mutex{}, le, os.Getpid(), maxLogSize, mode}
pLog := &logger{loggers, logfiles, level, logdir, &sync.Mutex{}, le, os.Getpid(), maxLogSize, mode, log.New(os.Stdout, "", 0)}
pLog.stdLogger.SetFlags(log.LstdFlags | log.Lmicroseconds)
go pLog.checkFile()
return pLog
}
@@ -119,7 +121,7 @@ func (l *logger) checkFile() {
backupPath := l.logDir + fname + ".0"
os.Remove(backupPath)
os.Rename(l.logDir+fname, backupPath)
newFile, e := os.OpenFile(l.logDir+fname, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
newFile, e := os.OpenFile(l.logDir+fname, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if e == nil {
l.loggers[lv].SetOutput(newFile)
l.files[lv] = newFile
@@ -142,7 +144,7 @@ func (l *logger) Printf(level LogLevel, format string, params ...interface{}) {
l.loggers[0].Printf("%d %s "+format+l.lineEnding, params...)
}
if l.mode == LogConsole || l.mode == LogFileAndConsole {
log.Printf("%d %s "+format+l.lineEnding, params...)
l.stdLogger.Printf("%d %s "+format+l.lineEnding, params...)
}
}
@@ -159,6 +161,6 @@ func (l *logger) Println(level LogLevel, params ...interface{}) {
l.loggers[0].Print(params...)
}
if l.mode == LogConsole || l.mode == LogFileAndConsole {
log.Print(params...)
l.stdLogger.Print(params...)
}
}

View File

@@ -3,7 +3,6 @@ package openp2p
import (
"encoding/json"
"fmt"
"log"
"math/rand"
"net"
"strconv"
@@ -14,27 +13,29 @@ import (
reuse "github.com/openp2p-cn/go-reuseport"
)
func natTCP(serverHost string, serverPort int, localPort int) (publicIP string, publicPort int) {
func natTCP(serverHost string, serverPort int) (publicIP string, publicPort int, localPort int) {
// dialer := &net.Dialer{
// LocalAddr: &net.TCPAddr{
// IP: net.ParseIP("0.0.0.0"),
// Port: localPort,
// },
// }
conn, err := reuse.DialTimeout("tcp4", fmt.Sprintf("%s:%d", "0.0.0.0", localPort), fmt.Sprintf("%s:%d", serverHost, serverPort), time.Second*5)
conn, err := reuse.DialTimeout("tcp4", fmt.Sprintf("%s:%d", "0.0.0.0", 0), fmt.Sprintf("%s:%d", serverHost, serverPort), NatTestTimeout)
// conn, err := net.Dial("tcp4", fmt.Sprintf("%s:%d", serverHost, serverPort))
// log.Println(LvINFO, conn.LocalAddr())
if err != nil {
fmt.Printf("Dial tcp4 %s:%d error:%s", serverHost, serverPort, err)
return
}
defer conn.Close()
localPort, _ = strconv.Atoi(strings.Split(conn.LocalAddr().String(), ":")[1])
_, wrerr := conn.Write([]byte("1"))
if wrerr != nil {
fmt.Printf("Write error: %s\n", wrerr)
return
}
b := make([]byte, 1000)
conn.SetReadDeadline(time.Now().Add(time.Second * 5))
conn.SetReadDeadline(time.Now().Add(NatTestTimeout))
n, rderr := conn.Read(b)
if rderr != nil {
fmt.Printf("Read error: %s\n", rderr)
@@ -83,7 +84,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string,
return "", 0, err
}
natRsp := NatDetectRsp{}
err = json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp)
json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp)
return natRsp.IP, natRsp.Port, nil
}
@@ -91,7 +92,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string,
func getNATType(host string, udp1 int, udp2 int) (publicIP string, NATType int, hasIPvr int, hasUPNPorNATPMP int, err error) {
// the random local port may be used by other.
localPort := int(rand.Uint32()%15000 + 50000)
echoPort := P2PNetworkInstance(nil).config.TCPPort
echoPort := gConf.Network.TCPPort
ip1, port1, err := natTest(host, udp1, localPort)
if err != nil {
return "", 0, 0, 0, err
@@ -120,6 +121,7 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP
echoConn, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: echoPort})
if err != nil {
gLog.Println(LvERROR, "echo server listen error:", err)
wg.Done()
return
}
buf := make([]byte, 1600)
@@ -134,7 +136,10 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP
echoConn.WriteToUDP(buf[0:n], addr)
gLog.Println(LvDEBUG, "echo server end")
}()
wg.Wait() // wait echo udp
wg.Wait() // wait echo udp
if echoConn == nil { // listen error
return
}
defer echoConn.Close()
// testing for public ip
for i := 0; i < 2; i++ {
@@ -151,14 +156,14 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP
gLog.Println(LvDEBUG, "could not perform UPNP external address:", err)
break
}
log.Println("PublicIP:", ext)
gLog.Println(LvINFO, "PublicIP:", ext)
externalPort, err := nat.AddPortMapping("udp", echoPort, echoPort, "openp2p", 30)
externalPort, err := nat.AddPortMapping("udp", echoPort, echoPort, "openp2p", 30) // 30 seconds fot upnp testing
if err != nil {
gLog.Println(LvDEBUG, "could not add udp UPNP port mapping", externalPort)
break
} else {
nat.AddPortMapping("tcp", echoPort, echoPort, "openp2p", 604800)
nat.AddPortMapping("tcp", echoPort, echoPort, "openp2p", 604800) // 7 days for tcp connection
}
}
gLog.Printf(LvDEBUG, "public ip test start %s:%d", publicIP, echoPort)

View File

@@ -13,7 +13,7 @@ func Run() {
rand.Seed(time.Now().UnixNano())
baseDir := filepath.Dir(os.Args[0])
os.Chdir(baseDir) // for system service
gLog = NewLogger(baseDir, ProducnName, LvDEBUG, 1024*1024, LogFileAndConsole)
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFileAndConsole)
// TODO: install sub command, deamon process
if len(os.Args) > 1 {
switch os.Args[1] {
@@ -21,7 +21,6 @@ func Run() {
fmt.Println(OpenP2PVersion)
return
case "update":
gLog = NewLogger(baseDir, ProducnName, LvDEBUG, 1024*1024, LogFileAndConsole)
targetPath := filepath.Join(defaultInstallPath, defaultBinName)
d := daemon{}
err := d.Control("restart", targetPath, nil)
@@ -53,6 +52,10 @@ func Run() {
gLog.Println(LvINFO, &gConf)
setFirewall()
err := setRLimit()
if err != nil {
gLog.Println(LvINFO, "setRLimit error:", err)
}
network := P2PNetworkInstance(&gConf.Network)
if ok := network.Connect(30000); !ok {
gLog.Println(LvERROR, "P2PNetwork login error")
@@ -70,7 +73,7 @@ var network *P2PNetwork
func RunAsModule(baseDir string, token string, bw int, logLevel int) *P2PNetwork {
rand.Seed(time.Now().UnixNano())
os.Chdir(baseDir) // for system service
gLog = NewLogger(baseDir, ProducnName, LvDEBUG, 1024*1024, LogFileAndConsole)
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFileAndConsole)
parseParams("")

View File

@@ -35,24 +35,23 @@ type overlayConn struct {
// for udp
connUDP *net.UDPConn
remoteAddr net.Addr
udpRelayData chan []byte
udpData chan []byte
lastReadUDPTs time.Time
}
func (oConn *overlayConn) run() {
gLog.Printf(LvDEBUG, "%d overlayConn run start", oConn.id)
defer gLog.Printf(LvDEBUG, "%d overlayConn run end", oConn.id)
oConn.running = true
oConn.lastReadUDPTs = time.Now()
buffer := make([]byte, ReadBuffLen+PaddingSize)
readBuf := buffer[:ReadBuffLen]
buffer := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
reuseBuff := buffer[:ReadBuffLen]
encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
tunnelHead := new(bytes.Buffer)
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, oConn.rtid)
binary.Write(tunnelHead, binary.LittleEndian, oConn.id)
for oConn.running && oConn.tunnel.isRuning() {
buff, dataLen, err := oConn.Read(readBuf)
readBuff, dataLen, err := oConn.Read(reuseBuff)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
continue
@@ -61,9 +60,9 @@ func (oConn *overlayConn) run() {
gLog.Printf(LvDEBUG, "overlayConn %d read error:%s,close it", oConn.id, err)
break
}
payload := buff[:dataLen]
payload := readBuff[:dataLen]
if oConn.appKey != 0 {
payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, buffer[:dataLen], dataLen)
payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, readBuff[:dataLen], dataLen)
}
writeBytes := append(tunnelHead.Bytes(), payload...)
if oConn.rtid == 0 {
@@ -85,20 +84,22 @@ func (oConn *overlayConn) run() {
}
oConn.tunnel.overlayConns.Delete(oConn.id)
// notify peer disconnect
if oConn.isClient {
req := OverlayDisconnectReq{ID: oConn.id}
if oConn.rtid == 0 {
oConn.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
} else {
// write relay data
msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
req := OverlayDisconnectReq{ID: oConn.id}
if oConn.rtid == 0 {
oConn.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
} else {
// write relay data
msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
}
func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error) {
func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, dataLen int, err error) {
if !oConn.running {
err = ErrOverlayConnDisconnect
return
}
if oConn.connUDP != nil {
if time.Now().After(oConn.lastReadUDPTs.Add(time.Minute * 5)) {
err = errors.New("udp close")
@@ -106,15 +107,15 @@ func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error)
}
if oConn.remoteAddr != nil { // as server
select {
case buff = <-oConn.udpRelayData:
n = len(buff)
case buff = <-oConn.udpData:
dataLen = len(buff) - PaddingSize
oConn.lastReadUDPTs = time.Now()
case <-time.After(time.Second * 10):
err = ErrDeadlineExceeded
}
} else { // as client
oConn.connUDP.SetReadDeadline(time.Now().Add(5 * time.Second))
n, _, err = oConn.connUDP.ReadFrom(reuseBuff)
oConn.connUDP.SetReadDeadline(time.Now().Add(UDPReadTimeout))
dataLen, _, err = oConn.connUDP.ReadFrom(reuseBuff)
if err == nil {
oConn.lastReadUDPTs = time.Now()
}
@@ -122,15 +123,21 @@ func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error)
}
return
}
oConn.connTCP.SetReadDeadline(time.Now().Add(time.Second * 5))
n, err = oConn.connTCP.Read(reuseBuff)
buff = reuseBuff
if oConn.connTCP != nil {
oConn.connTCP.SetReadDeadline(time.Now().Add(UDPReadTimeout))
dataLen, err = oConn.connTCP.Read(reuseBuff)
buff = reuseBuff
}
return
}
// calling by p2pTunnel
func (oConn *overlayConn) Write(buff []byte) (n int, err error) {
// add mutex when multi-thread calling
if !oConn.running {
return 0, ErrOverlayConnDisconnect
}
if oConn.connUDP != nil {
if oConn.remoteAddr == nil {
n, err = oConn.connUDP.Write(buff)
@@ -142,9 +149,25 @@ func (oConn *overlayConn) Write(buff []byte) (n int, err error) {
}
return
}
n, err = oConn.connTCP.Write(buff)
if oConn.connTCP != nil {
n, err = oConn.connTCP.Write(buff)
}
if err != nil {
oConn.running = false
}
return
}
func (oConn *overlayConn) Close() (err error) {
oConn.running = false
if oConn.connTCP != nil {
oConn.connTCP.Close()
oConn.connTCP = nil
}
if oConn.connUDP != nil {
oConn.connUDP.Close()
oConn.connUDP = nil
}
return nil
}

View File

@@ -17,6 +17,7 @@ type p2pApp struct {
listener net.Listener
listenerUDP *net.UDPConn
tunnel *P2PTunnel
iptree *IPTree
rtid uint64 // relay tunnelID
relayNode string
relayMode string
@@ -51,7 +52,7 @@ func (app *p2pApp) listenTCP() error {
gLog.Printf(LvDEBUG, "tcp accept on port %d start", app.config.SrcPort)
defer gLog.Printf(LvDEBUG, "tcp accept on port %d end", app.config.SrcPort)
var err error
app.listener, err = net.Listen("tcp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort))
app.listener, err = net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort)) // support tcp4 and tcp6
if err != nil {
gLog.Printf(LvERROR, "listen error:%s", err)
return err
@@ -64,6 +65,15 @@ func (app *p2pApp) listenTCP() error {
}
break
}
// check white list
if app.config.Whitelist != "" {
remoteIP := strings.Split(conn.RemoteAddr().String(), ":")[0]
if !app.iptree.Contains(remoteIP) {
conn.Close()
gLog.Printf(LvERROR, "%s not in whitelist, access denied", remoteIP)
continue
}
}
oConn := overlayConn{
tunnel: app.tunnel,
connTCP: conn,
@@ -72,6 +82,7 @@ func (app *p2pApp) listenTCP() error {
rtid: app.rtid,
appID: app.id,
appKey: app.key,
running: true,
}
// pre-calc key bytes for encrypt
if oConn.appKey != 0 {
@@ -81,7 +92,7 @@ func (app *p2pApp) listenTCP() error {
oConn.appKeyBytes = encryptKey
}
app.tunnel.overlayConns.Store(oConn.id, &oConn)
gLog.Printf(LvDEBUG, "Accept TCP overlayID:%d", oConn.id)
gLog.Printf(LvDEBUG, "Accept TCP overlayID:%d, %s", oConn.id, oConn.connTCP.RemoteAddr())
// tell peer connect
req := OverlayConnectReq{ID: oConn.id,
Token: app.tunnel.pn.config.Token,
@@ -100,6 +111,8 @@ func (app *p2pApp) listenTCP() error {
msgWithHead := append(relayHead.Bytes(), msg...)
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
// TODO: wait OverlayConnectRsp instead of sleep
time.Sleep(time.Second) // waiting remote node connection ok
go oConn.run()
}
return nil
@@ -114,10 +127,10 @@ func (app *p2pApp) listenUDP() error {
gLog.Printf(LvERROR, "listen error:%s", err)
return err
}
buffer := make([]byte, 64*1024)
buffer := make([]byte, 64*1024+PaddingSize)
udpID := make([]byte, 8)
for {
app.listenerUDP.SetReadDeadline(time.Now().Add(time.Second * 10))
app.listenerUDP.SetReadDeadline(time.Now().Add(UDPReadTimeout))
len, remoteAddr, err := app.listenerUDP.ReadFrom(buffer)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
@@ -127,8 +140,8 @@ func (app *p2pApp) listenUDP() error {
break
}
} else {
b := bytes.Buffer{}
b.Write(buffer[:len])
dupData := bytes.Buffer{} // should uses memory pool
dupData.Write(buffer[:len+PaddingSize])
// load from app.tunnel.overlayConns by remoteAddr error, new udp connection
remoteIP := strings.Split(remoteAddr.String(), ":")[0]
port, _ := strconv.Atoi(strings.Split(remoteAddr.String(), ":")[1])
@@ -139,19 +152,20 @@ func (app *p2pApp) listenUDP() error {
udpID[3] = a[3]
udpID[4] = byte(port)
udpID[5] = byte(port >> 8)
id := binary.LittleEndian.Uint64(udpID)
id := binary.LittleEndian.Uint64(udpID) // convert remoteIP:port to uint64
s, ok := app.tunnel.overlayConns.Load(id)
if !ok {
oConn := overlayConn{
tunnel: app.tunnel,
connUDP: app.listenerUDP,
remoteAddr: remoteAddr,
udpRelayData: make(chan []byte, 1000),
id: id,
isClient: true,
rtid: app.rtid,
appID: app.id,
appKey: app.key,
tunnel: app.tunnel,
connUDP: app.listenerUDP,
remoteAddr: remoteAddr,
udpData: make(chan []byte, 1000),
id: id,
isClient: true,
rtid: app.rtid,
appID: app.id,
appKey: app.key,
running: true,
}
// calc key bytes for encrypt
if oConn.appKey != 0 {
@@ -180,8 +194,10 @@ func (app *p2pApp) listenUDP() error {
msgWithHead := append(relayHead.Bytes(), msg...)
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
// TODO: wait OverlayConnectRsp instead of sleep
time.Sleep(time.Second) // waiting remote node connection ok
go oConn.run()
oConn.udpRelayData <- b.Bytes()
oConn.udpData <- dupData.Bytes()
}
// load from app.tunnel.overlayConns by remoteAddr ok, write relay data
@@ -189,7 +205,7 @@ func (app *p2pApp) listenUDP() error {
if !ok {
continue
}
overlayConn.udpRelayData <- b.Bytes()
overlayConn.udpData <- dupData.Bytes()
}
}
return nil
@@ -204,12 +220,15 @@ func (app *p2pApp) listen() error {
if app.rtid != 0 {
go app.relayHeartbeatLoop()
}
for app.tunnel.isRuning() && app.running {
for app.tunnel.isRuning() {
if app.config.Protocol == "udp" {
app.listenUDP()
} else {
app.listenTCP()
}
if !app.running {
break
}
time.Sleep(time.Second * 10)
}
return nil

View File

@@ -3,14 +3,15 @@ package openp2p
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
"math/rand"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"time"
@@ -23,16 +24,26 @@ var (
once sync.Once
)
const (
retryLimit = 20
retryInterval = 10 * time.Second
dtma = 20
ddtma = 5
)
type P2PNetwork struct {
conn *websocket.Conn
online bool
running bool
restartCh chan bool
wg sync.WaitGroup
writeMtx sync.Mutex
serverTs int64
localTs int64
hbTime time.Time
conn *websocket.Conn
online bool
running bool
restartCh chan bool
wgReconnect sync.WaitGroup
writeMtx sync.Mutex
hbTime time.Time
// for sync server time
t1 int64 // nanoSeconds
dt int64 // client faster then server dt nanoSeconds
dtma int64
ddt int64 // differential of dt
// msgMap sync.Map
msgMap map[uint64]chan []byte //key: nodeID
msgMapMtx sync.Mutex
@@ -51,6 +62,8 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
running: true,
msgMap: make(map[uint64]chan []byte),
limiter: newBandwidthLimiter(config.ShareBandwidth),
dt: 0,
ddt: 0,
}
instance.msgMap[0] = make(chan []byte) // for gateway
if config != nil {
@@ -64,18 +77,18 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
}
func (pn *P2PNetwork) run() {
go pn.readLoop()
go pn.autorunApp()
heartbeatTimer := time.NewTicker(NetworkHeartbeatTime)
pn.t1 = time.Now().UnixNano()
pn.write(MsgHeartbeat, 0, "")
for pn.running {
select {
case <-heartbeatTimer.C: // TODO: deal with connect failed, no send hb
case <-heartbeatTimer.C:
pn.t1 = time.Now().UnixNano()
pn.write(MsgHeartbeat, 0, "")
case <-pn.restartCh:
pn.online = false
pn.wg.Wait() // wait read/write goroutine exited
time.Sleep(NetworkHeartbeatTime)
pn.wgReconnect.Wait() // wait read/autorunapp goroutine end
time.Sleep(ClientAPITimeout)
err := pn.init()
if err != nil {
gLog.Println(LvERROR, "P2PNetwork init error:", err)
@@ -85,7 +98,7 @@ func (pn *P2PNetwork) run() {
}
func (pn *P2PNetwork) Connect(timeout int) bool {
// waiting for login response
// waiting for heartbeat
for i := 0; i < (timeout / 1000); i++ {
if pn.hbTime.After(time.Now().Add(-NetworkHeartbeatTime)) {
return true
@@ -101,62 +114,50 @@ func (pn *P2PNetwork) runAll() {
allApps := gConf.Apps // read a copy, other thread will modify the gConf.Apps
for _, config := range allApps {
if config.nextRetryTime.After(time.Now()) {
continue
}
if config.Enabled == 0 {
if config.nextRetryTime.After(time.Now()) || config.Enabled == 0 || config.retryNum >= retryLimit {
continue
}
if config.AppName == "" {
config.AppName = fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)
config.AppName = config.ID()
}
appExist := false
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
if ok {
app := i.(*p2pApp)
appExist = true
if app.isActive() {
if i, ok := pn.apps.Load(config.ID()); ok {
if app := i.(*p2pApp); app.isActive() {
continue
}
}
if appExist {
pn.DeleteApp(*config)
}
if config.retryNum > 0 {
if config.retryNum > 0 { // first time not show reconnect log
gLog.Printf(LvINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum)
if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // normal lasts 15min
if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // run normally 15min, reset retrynum
config.retryNum = 0
}
}
config.retryNum++
config.retryTime = time.Now()
increase := math.Pow(1.5, float64(config.retryNum)) // exponential increase retry time. 1.5^x
if increase > 900 {
increase = 900
config.Enabled = 0
gLog.Printf(LvWARN, "app %s has stopped retry, manually enable it on Web console", config.AppName)
continue
}
config.nextRetryTime = time.Now().Add(time.Second * time.Duration(increase))
config.nextRetryTime = time.Now().Add(retryInterval)
config.connectTime = time.Now()
config.peerToken = pn.config.Token
gConf.mtx.Unlock() // AddApp will take a period of time
gConf.mtx.Unlock() // AddApp will take a period of time, let outside modify gConf
err := pn.AddApp(*config)
gConf.mtx.Lock()
if err != nil {
config.errMsg = err.Error()
if err == ErrPeerOffline { // stop retry, waiting for online
config.retryNum = retryLimit
gLog.Printf(LvINFO, " %s offline, it will auto reconnect when peer node online", config.PeerNode)
}
}
}
}
func (pn *P2PNetwork) autorunApp() {
gLog.Println(LvINFO, "autorunApp start")
for pn.running {
pn.wgReconnect.Add(1)
defer pn.wgReconnect.Done()
for pn.running && pn.online {
time.Sleep(time.Second)
if !pn.online {
continue
}
pn.runAll()
time.Sleep(time.Second * 10)
}
gLog.Println(LvINFO, "autorunApp end")
}
@@ -166,14 +167,12 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
defer gLog.Printf(LvINFO, "addRelayTunnel to %s end", config.PeerNode)
// request a relay node or specify manually(TODO)
pn.write(MsgRelay, MsgRelayNodeReq, &RelayNodeReq{config.PeerNode})
head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, time.Second*10)
head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, ClientAPITimeout)
if head == nil {
return nil, 0, "", errors.New("read MsgRelayNodeRsp error")
}
rsp := RelayNodeRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
if err := json.Unmarshal(body, &rsp); err != nil {
return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error")
}
if rsp.RelayName == "" || rsp.RelayToken == 0 {
@@ -206,9 +205,7 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
return nil, 0, "", errors.New("read MsgPushAddRelayTunnelRsp error")
}
rspID := TunnelMsg{}
err = json.Unmarshal(body, &rspID)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err)
if err = json.Unmarshal(body, &rspID); err != nil {
return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error")
}
return t, rspID.ID, rsp.Mode, err
@@ -223,7 +220,7 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
}
// check if app already exist?
appExist := false
_, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
_, ok := pn.apps.Load(config.ID())
if ok {
appExist = true
}
@@ -289,11 +286,13 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
key: appKey,
tunnel: t,
config: config,
iptree: NewIPTree(config.Whitelist),
rtid: rtid,
relayNode: relayNode,
relayMode: relayMode,
hbTime: time.Now()}
pn.apps.Store(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort), &app)
pn.apps.Store(config.ID(), &app)
gLog.Printf(LvINFO, "%s use tunnel %d", app.config.AppName, app.tunnel.id)
if err == nil {
go app.listen()
}
@@ -304,16 +303,37 @@ func (pn *P2PNetwork) DeleteApp(config AppConfig) {
gLog.Printf(LvINFO, "DeleteApp %s%d start", config.Protocol, config.SrcPort)
defer gLog.Printf(LvINFO, "DeleteApp %s%d end", config.Protocol, config.SrcPort)
// close the apps of this config
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
i, ok := pn.apps.Load(config.ID())
if ok {
app := i.(*p2pApp)
gLog.Printf(LvINFO, "app %s exist, delete it", fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
gLog.Printf(LvINFO, "app %s exist, delete it", app.config.AppName)
app.close()
pn.apps.Delete(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
pn.apps.Delete(config.ID())
}
}
func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, error) {
func (pn *P2PNetwork) findTunnel(config *AppConfig) (t *P2PTunnel) {
// find existing tunnel to peer
pn.allTunnels.Range(func(id, i interface{}) bool {
tmpt := i.(*P2PTunnel)
if tmpt.config.PeerNode == config.PeerNode {
gLog.Println(LvINFO, "tunnel already exist ", config.PeerNode)
isActive := tmpt.checkActive()
// inactive, close it
if !isActive {
gLog.Println(LvINFO, "but it's not active, close it ", config.PeerNode)
tmpt.close()
} else {
t = tmpt
}
return false
}
return true
})
return t
}
func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunnel, err error) {
gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort)
isClient := false
@@ -322,63 +342,34 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
tid = rand.Uint64()
isClient = true
}
exist := false
// find existing tunnel to peer
var t *P2PTunnel
pn.allTunnels.Range(func(id, i interface{}) bool {
t = i.(*P2PTunnel)
if t.config.PeerNode == config.PeerNode {
// server side force close existing tunnel
if !isClient {
t.close()
return false
}
// client side checking
gLog.Println(LvINFO, "tunnel already exist ", config.PeerNode)
isActive := t.checkActive()
// inactive, close it
if !isActive {
gLog.Println(LvINFO, "but it's not active, close it ", config.PeerNode)
t.close()
} else {
// active
exist = true
}
return false
}
return true
})
if exist {
if t = pn.findTunnel(&config); t != nil {
return t, nil
}
// create tunnel if not exist
t = &P2PTunnel{pn: pn,
config: config,
id: tid,
}
pn.msgMapMtx.Lock()
pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan []byte, 50)
pn.msgMapMtx.Unlock()
// server side
if !isClient {
err := pn.newTunnel(t, tid, isClient)
t, err = pn.newTunnel(config, tid, isClient)
return t, err // always return
}
// client side
// peer info
initErr := t.requestPeerInfo()
initErr := pn.requestPeerInfo(&config)
if initErr != nil {
gLog.Println(LvERROR, "init error:", initErr)
return nil, initErr
}
err := ErrorHandshake
// try TCP6
if IsIPv6(t.config.peerIPv6) && IsIPv6(t.pn.config.publicIPv6) {
if IsIPv6(config.peerIPv6) && IsIPv6(pn.config.publicIPv6) {
gLog.Println(LvINFO, "try TCP6")
t.config.linkMode = LinkModeTCP6
t.config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil {
config.linkMode = LinkModeTCP6
config.isUnderlayServer = 0
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil
}
}
@@ -386,61 +377,79 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
// TODO: try UDP6
// try TCP4
if t.config.hasIPv4 == 1 || t.pn.config.hasIPv4 == 1 || t.config.hasUPNPorNATPMP == 1 || t.pn.config.hasUPNPorNATPMP == 1 {
if config.hasIPv4 == 1 || pn.config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 || pn.config.hasUPNPorNATPMP == 1 {
gLog.Println(LvINFO, "try TCP4")
t.config.linkMode = LinkModeTCP4
if t.config.hasIPv4 == 1 || t.config.hasUPNPorNATPMP == 1 {
t.config.isUnderlayServer = 0
config.linkMode = LinkModeTCP4
if config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 {
config.isUnderlayServer = 0
} else {
t.config.isUnderlayServer = 1
config.isUnderlayServer = 1
}
if err = pn.newTunnel(t, tid, isClient); err == nil {
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil
}
}
// TODO: try UDP4
// try TCPPunch
if t.config.peerNatType == NATCone && t.pn.config.natType == NATCone { // TODO: support c2s
gLog.Println(LvINFO, "try TCP4 Punch")
t.config.linkMode = LinkModeTCPPunch
t.config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil {
return t, nil
for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries
if config.peerNatType == NATCone && pn.config.natType == NATCone { // TODO: support c2s
gLog.Println(LvINFO, "try TCP4 Punch")
config.linkMode = LinkModeTCPPunch
config.isUnderlayServer = 0
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
gLog.Println(LvINFO, "TCP4 Punch ok")
return t, nil
}
}
}
// try UDPPunch
if t.config.peerNatType == NATCone || t.pn.config.natType == NATCone {
gLog.Println(LvINFO, "try UDP4 Punch")
t.config.linkMode = LinkModeUDPPunch
t.config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil {
return t, nil
for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries
if config.peerNatType == NATCone || pn.config.natType == NATCone {
gLog.Println(LvINFO, "try UDP4 Punch")
config.linkMode = LinkModeUDPPunch
config.isUnderlayServer = 0
if t, err = pn.newTunnel(config, tid, isClient); err == nil {
return t, nil
}
}
if !(config.peerNatType == NATCone && pn.config.natType == NATCone) { // not cone2cone, no more try
break
}
}
return nil, err
return nil, ErrorHandshake // only ErrorHandshake will try relay
}
func (pn *P2PNetwork) newTunnel(t *P2PTunnel, tid uint64, isClient bool) error {
func (pn *P2PNetwork) newTunnel(config AppConfig, tid uint64, isClient bool) (t *P2PTunnel, err error) {
if existTunnel := pn.findTunnel(&config); existTunnel != nil {
return existTunnel, nil
}
t = &P2PTunnel{pn: pn,
config: config,
id: tid,
}
t.initPort()
if isClient {
if err := t.connect(); err != nil {
if err = t.connect(); err != nil {
gLog.Println(LvERROR, "p2pTunnel connect error:", err)
return err
return
}
} else {
if err := t.listen(); err != nil {
if err = t.listen(); err != nil {
gLog.Println(LvERROR, "p2pTunnel listen error:", err)
return err
return
}
}
// store it when success
gLog.Printf(LvDEBUG, "store tunnel %d", tid)
pn.allTunnels.Store(tid, t)
return nil
return
}
func (pn *P2PNetwork) init() error {
gLog.Println(LvINFO, "init start")
pn.wgReconnect.Add(1)
defer pn.wgReconnect.Done()
var err error
for {
// detect nat type
@@ -450,12 +459,14 @@ func (pn *P2PNetwork) init() error {
pn.config.natType = NATSymmetric
pn.config.hasIPv4 = 0
pn.config.hasUPNPorNATPMP = 0
gLog.Println(LvINFO, "openp2pS2STest debug")
}
if strings.Contains(pn.config.Node, "openp2pC2CTest") {
pn.config.natType = NATCone
pn.config.hasIPv4 = 0
pn.config.hasUPNPorNATPMP = 0
gLog.Println(LvINFO, "openp2pC2CTest debug")
}
if err != nil {
gLog.Println(LvDEBUG, "detect NAT type error:", err)
@@ -463,8 +474,13 @@ func (pn *P2PNetwork) init() error {
}
gLog.Println(LvDEBUG, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP)
gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort)
uri := "/openp2p/v1/login"
config := tls.Config{InsecureSkipVerify: true} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert
uri := "/api/v1/login"
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(rootCA))
config := tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: false} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert
websocket.DefaultDialer.TLSClientConfig = &config
u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri}
q := u.Query()
@@ -488,30 +504,32 @@ func (pn *P2PNetwork) init() error {
err = errors.New("get local ip failed")
break
}
go pn.readLoop()
pn.config.mac = getmac(pn.config.localIP)
pn.config.os = getOsName()
req := ReportBasic{
Mac: pn.config.mac,
LanIP: pn.config.localIP,
OS: pn.config.os,
HasIPv4: pn.config.hasIPv4,
HasUPNPorNATPMP: pn.config.hasUPNPorNATPMP,
Version: OpenP2PVersion,
}
rsp := netInfo()
gLog.Println(LvDEBUG, "netinfo:", rsp)
if rsp != nil && rsp.Country != "" {
if IsIPv6(rsp.IP.String()) {
pn.config.publicIPv6 = rsp.IP.String()
go func() {
req := ReportBasic{
Mac: pn.config.mac,
LanIP: pn.config.localIP,
OS: pn.config.os,
HasIPv4: pn.config.hasIPv4,
HasUPNPorNATPMP: pn.config.hasUPNPorNATPMP,
Version: OpenP2PVersion,
}
req.NetInfo = *rsp
} else {
pn.refreshIPv6(true)
}
req.IPv6 = pn.config.publicIPv6
pn.write(MsgReport, MsgReportBasic, &req)
rsp := netInfo()
gLog.Println(LvDEBUG, "netinfo:", rsp)
if rsp != nil && rsp.Country != "" {
if IsIPv6(rsp.IP.String()) {
pn.config.publicIPv6 = rsp.IP.String()
}
req.NetInfo = *rsp
} else {
pn.refreshIPv6(true)
}
req.IPv6 = pn.config.publicIPv6
pn.write(MsgReport, MsgReportBasic, &req)
}()
go pn.autorunApp()
gLog.Println(LvDEBUG, "P2PNetwork init ok")
break
}
@@ -534,31 +552,44 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
case MsgLogin:
// gLog.Println(LevelINFO,string(msg))
rsp := LoginRsp{}
err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong login response:%s", err)
if err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(rsp), err)
return
}
if rsp.Error != 0 {
gLog.Printf(LvERROR, "login error:%d, detail:%s", rsp.Error, rsp.Detail)
pn.running = false
} else {
pn.serverTs = rsp.Ts
pn.hbTime = time.Now()
pn.config.Token = rsp.Token
pn.config.User = rsp.User
gConf.setToken(rsp.Token)
gConf.setUser(rsp.User)
if len(rsp.Node) >= MinNodeNameLen {
gConf.setNode(rsp.Node)
pn.config.Node = rsp.Node
}
gConf.save()
pn.localTs = time.Now().Unix()
gLog.Printf(LvINFO, "login ok. user=%s,node=%s,Server ts=%d, local ts=%d", rsp.User, rsp.Node, rsp.Ts, pn.localTs)
gLog.Printf(LvINFO, "login ok. user=%s,node=%s", rsp.User, rsp.Node)
}
case MsgHeartbeat:
gLog.Printf(LvDEBUG, "P2PNetwork heartbeat ok")
pn.hbTime = time.Now()
rtt := pn.hbTime.UnixNano() - pn.t1
t2 := int64(binary.LittleEndian.Uint64(msg[openP2PHeaderSize : openP2PHeaderSize+8]))
dt := pn.t1 + rtt/2 - t2
if pn.dtma == 0 {
pn.dtma = dt
} else {
ddt := dt - pn.dt
// if pn.ddt == 0 {
pn.ddt = ddt
// } else {
// pn.ddt = pn.ddt/ddtma*(ddtma-1) + ddt/ddtma // avoid int64 overflow
// }
pn.dtma = pn.dtma/dtma*(dtma-1) + dt/dtma // avoid int64 overflow
}
pn.dt = dt
gLog.Printf(LvDEBUG, "server time dt=%dms ddt=%dns rtt=%dms", pn.dt/int64(time.Millisecond), pn.ddt, rtt/int64(time.Millisecond))
case MsgPush:
handlePush(pn, head.SubType, msg)
default:
@@ -572,8 +603,8 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
func (pn *P2PNetwork) readLoop() {
gLog.Printf(LvDEBUG, "P2PNetwork readLoop start")
pn.wg.Add(1)
defer pn.wg.Done()
pn.wgReconnect.Add(1)
defer pn.wgReconnect.Done()
for pn.running {
pn.conn.SetReadDeadline(time.Now().Add(NetworkHeartbeatTime + 10*time.Second))
t, msg, err := pn.conn.ReadMessage()
@@ -694,6 +725,7 @@ func (pn *P2PNetwork) updateAppHeartbeat(appID uint64) {
})
}
// ipv6 will expired need to refresh.
func (pn *P2PNetwork) refreshIPv6(force bool) {
if !force && !IsIPv6(pn.config.publicIPv6) { // not support ipv6, not refresh
return
@@ -713,3 +745,30 @@ func (pn *P2PNetwork) refreshIPv6(force bool) {
}
pn.config.publicIPv6 = string(buf[:n])
}
func (pn *P2PNetwork) requestPeerInfo(config *AppConfig) error {
// request peer info
pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{config.peerToken, config.PeerNode})
head, body := pn.read("", MsgQuery, MsgQueryPeerInfoRsp, UnderlayConnectTimeout)
if head == nil {
return ErrNetwork // network error, should not be ErrPeerOffline
}
rsp := QueryPeerInfoRsp{}
if err := json.Unmarshal(body, &rsp); err != nil {
return ErrMsgFormat
}
if rsp.Online == 0 {
return ErrPeerOffline
}
if compareVersion(rsp.Version, LeastSupportVersion) == LESS {
return ErrVersionNotCompatible
}
config.peerVersion = rsp.Version
config.hasIPv4 = rsp.HasIPv4
config.peerIP = rsp.IPv4
config.peerIPv6 = rsp.IPv6
config.hasUPNPorNATPMP = rsp.HasUPNPorNATPMP
config.peerNatType = rsp.NatType
///
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"math/rand"
"net"
"reflect"
"sync"
"time"
)
@@ -29,36 +30,9 @@ type P2PTunnel struct {
coneLocalPort int
coneNatPort int
linkModeWeb string // use config.linkmode
punchTs uint64
}
func (t *P2PTunnel) requestPeerInfo() error {
// request peer info
t.pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{t.config.peerToken, t.config.PeerNode})
head, body := t.pn.read("", MsgQuery, MsgQueryPeerInfoRsp, time.Second*10)
if head == nil {
return ErrPeerOffline
}
rsp := QueryPeerInfoRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong QueryPeerInfoRsp:%s", err)
return ErrMsgFormat
}
if rsp.Online == 0 {
return ErrPeerOffline
}
if compareVersion(rsp.Version, LeastSupportVersion) == LESS {
return ErrVersionNotCompatible
}
t.config.peerVersion = rsp.Version
t.config.hasIPv4 = rsp.HasIPv4
t.config.peerIP = rsp.IPv4
t.config.peerIPv6 = rsp.IPv6
t.config.hasUPNPorNATPMP = rsp.HasUPNPorNATPMP
t.config.peerNatType = rsp.NatType
///
return nil
}
func (t *P2PTunnel) initPort() {
t.running = true
t.hbMtx.Lock()
@@ -74,15 +48,15 @@ func (t *P2PTunnel) initPort() {
t.coneNatPort = t.pn.config.TCPPort // symmetric doesn't need coneNatPort
}
if t.config.linkMode == LinkModeUDPPunch {
// prepare one random cone hole
// prepare one random cone hole manually
_, natPort, _ := natTest(t.pn.config.ServerHost, t.pn.config.UDPPort1, localPort)
t.coneLocalPort = localPort
t.coneNatPort = natPort
}
if t.config.linkMode == LinkModeTCPPunch {
// prepare one random cone hole
_, natPort := natTCP(t.pn.config.ServerHost, IfconfigPort1, localPort)
t.coneLocalPort = localPort
// prepare one random cone hole by system automatically
_, natPort, localPort2 := natTCP(t.pn.config.ServerHost, IfconfigPort1)
t.coneLocalPort = localPort2
t.coneNatPort = natPort
}
t.la = &net.UDPAddr{IP: net.ParseIP(t.pn.config.localIP), Port: t.coneLocalPort}
@@ -106,20 +80,19 @@ func (t *P2PTunnel) connect() error {
AppKey: appKey,
Version: OpenP2PVersion,
LinkMode: t.config.linkMode,
IsUnderlayServer: t.config.isUnderlayServer ^ 1,
IsUnderlayServer: t.config.isUnderlayServer ^ 1, // peer
}
if req.Token == 0 { // no relay token
req.Token = t.pn.config.Token
}
t.pn.push(t.config.PeerNode, MsgPushConnectReq, req)
head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, time.Second*10)
head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, ClientAPITimeout)
if head == nil {
return errors.New("connect error")
}
rsp := PushConnectRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushConnectRsp:%s", err)
if err := json.Unmarshal(body, &rsp); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(rsp), err)
return err
}
// gLog.Println(LevelINFO, rsp)
@@ -133,7 +106,8 @@ func (t *P2PTunnel) connect() error {
t.config.peerVersion = rsp.Version
t.config.peerConeNatPort = rsp.ConeNatPort
t.config.peerIP = rsp.FromIP
err = t.start()
t.punchTs = rsp.PunchTs
err := t.start()
if err != nil {
gLog.Println(LvERROR, "handshake error:", err)
err = ErrorHandshake
@@ -154,20 +128,19 @@ func (t *P2PTunnel) setRun(running bool) {
}
func (t *P2PTunnel) isActive() bool {
if !t.isRuning() {
return false
}
t.hbMtx.Lock()
defer t.hbMtx.Unlock()
return time.Now().Before(t.hbTime.Add(TunnelIdleTimeout))
}
func (t *P2PTunnel) checkActive() bool {
hbt := time.Now()
t.hbMtx.Lock()
if t.hbTime.Before(time.Now().Add(-TunnelHeartbeatTime)) {
t.hbMtx.Unlock()
if t.conn == nil {
return false
}
t.hbMtx.Unlock()
// hbtime within TunnelHeartbeatTime, check it now
hbt := time.Now()
t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeat, nil)
isActive := false
// wait at most 5s
@@ -179,6 +152,7 @@ func (t *P2PTunnel) checkActive() bool {
t.hbMtx.Unlock()
time.Sleep(time.Millisecond * 100)
}
gLog.Printf(LvINFO, "checkActive %t. hbtime=%d", isActive, t.hbTime)
return isActive
}
@@ -210,6 +184,13 @@ func (t *P2PTunnel) handshake() error {
return err
}
}
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else {
ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts)
}
gLog.Println(LvDEBUG, "handshake to ", t.config.PeerNode)
var err error
// TODO: handle NATNone, nodes with public ip has no punching
@@ -238,7 +219,7 @@ func (t *P2PTunnel) connectUnderlay() (err error) {
case LinkModeTCP6:
t.conn, err = t.connectUnderlayTCP6()
case LinkModeTCP4:
t.conn, err = t.connectUnderlayTCP()
t.conn, err = t.connectUnderlayTCP() // TODO: can not listen the same tcp port in pararell
case LinkModeTCPPunch:
t.conn, err = t.connectUnderlayTCP()
case LinkModeUDPPunch:
@@ -295,7 +276,7 @@ func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) {
return nil, fmt.Errorf("quic listen error:%s", e)
}
}
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5)
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
gLog.Println(LvDEBUG, "quic dial to ", t.ra.String())
qConn, e = dialQuic(conn, t.ra, TunnelIdleTimeout)
if e != nil {
@@ -324,8 +305,7 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) {
defer gLog.Println(LvINFO, "connectUnderlayTCP end")
var qConn *underlayTCP
if t.config.isUnderlayServer == 1 {
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil)
qConn, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode)
qConn, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode, t)
if err != nil {
return nil, fmt.Errorf("listen TCP error:%s", err)
}
@@ -342,9 +322,20 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) {
return qConn, nil
}
//else
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5)
gLog.Println(LvDEBUG, "TCP dial to ", t.config.peerIP, ":", t.config.peerConeNatPort)
// client side
if t.config.linkMode == LinkModeTCP4 {
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
} else { //tcp punch should sleep for punch the same time
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else {
ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts)
}
}
gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", t.coneLocalPort), "-->", fmt.Sprintf("%s:%d", t.config.peerIP, t.config.peerConeNatPort))
qConn, err = dialTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode)
if err != nil {
return nil, fmt.Errorf("TCP dial to %s:%d error:%s", t.config.peerIP, t.config.peerConeNatPort, err)
@@ -371,7 +362,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) {
var qConn *underlayTCP6
if t.config.isUnderlayServer == 1 {
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil)
qConn, err = listenTCP6(t.coneNatPort, TunnelIdleTimeout)
qConn, err = listenTCP6(t.coneNatPort, HandshakeTimeout)
if err != nil {
return nil, fmt.Errorf("listen TCP6 error:%s", err)
}
@@ -389,7 +380,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) {
}
//else
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5)
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
gLog.Println(LvDEBUG, "TCP6 dial to ", t.config.peerIPv6)
qConn, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort)
if err != nil {
@@ -429,6 +420,7 @@ func (t *P2PTunnel) readLoop() {
}
switch head.SubType {
case MsgTunnelHeartbeat:
t.hbTime = time.Now()
t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeatAck, nil)
gLog.Printf(LvDEBUG, "%d read tunnel heartbeat", t.id)
case MsgTunnelHeartbeatAck:
@@ -441,7 +433,7 @@ func (t *P2PTunnel) readLoop() {
continue
}
overlayID := binary.LittleEndian.Uint64(body[:8])
gLog.Printf(LvDEBUG, "%d tunnel read overlay data %d", t.id, overlayID)
gLog.Printf(LvDEBUG, "%d tunnel read overlay data %d bodylen=%d", t.id, overlayID, head.DataLen)
s, ok := t.overlayConns.Load(overlayID)
if !ok {
// debug level, when overlay connection closed, always has some packet not found tunnel
@@ -470,9 +462,8 @@ func (t *P2PTunnel) readLoop() {
t.pn.relay(tunnelID, body[8:])
case MsgRelayHeartbeat:
req := RelayHeartbeat{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LvERROR, "wrong RelayHeartbeat:%s", err)
if err := json.Unmarshal(body, &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
continue
}
gLog.Printf(LvDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID)
@@ -492,9 +483,8 @@ func (t *P2PTunnel) readLoop() {
t.pn.updateAppHeartbeat(req.AppID)
case MsgOverlayConnectReq:
req := OverlayConnectReq{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgOverlayConnectReq:%s", err)
if err := json.Unmarshal(body, &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
continue
}
// app connect only accept token(not relay totp token), avoid someone using the share relay node's token
@@ -504,7 +494,7 @@ func (t *P2PTunnel) readLoop() {
}
overlayID := req.ID
gLog.Printf(LvDEBUG, "App:%d overlayID:%d connect %+v", req.AppID, overlayID, req)
gLog.Printf(LvDEBUG, "App:%d overlayID:%d connect %s:%d", req.AppID, overlayID, req.DstIP, req.DstPort)
oConn := overlayConn{
tunnel: t,
id: overlayID,
@@ -512,11 +502,13 @@ func (t *P2PTunnel) readLoop() {
rtid: req.RelayTunnelID,
appID: req.AppID,
appKey: GetKey(req.AppID),
running: true,
}
if req.Protocol == "udp" {
oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort})
} else {
oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5)
oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), HandshakeTimeout)
}
if err != nil {
gLog.Println(LvERROR, err)
@@ -525,7 +517,7 @@ func (t *P2PTunnel) readLoop() {
// calc key bytes for encrypt
if oConn.appKey != 0 {
encryptKey := make([]byte, 16)
encryptKey := make([]byte, AESKeySize)
binary.LittleEndian.PutUint64(encryptKey, oConn.appKey)
binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey)
oConn.appKeyBytes = encryptKey
@@ -535,9 +527,8 @@ func (t *P2PTunnel) readLoop() {
go oConn.run()
case MsgOverlayDisconnectReq:
req := OverlayDisconnectReq{}
err := json.Unmarshal(body, &req)
if err != nil {
gLog.Printf(LvERROR, "wrong OverlayDisconnectRequest:%s", err)
if err := json.Unmarshal(body, &req); err != nil {
gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err)
continue
}
overlayID := req.ID
@@ -545,7 +536,7 @@ func (t *P2PTunnel) readLoop() {
i, ok := t.overlayConns.Load(overlayID)
if ok {
oConn := i.(*overlayConn)
oConn.running = false
oConn.Close()
}
default:
}
@@ -589,8 +580,10 @@ func (t *P2PTunnel) listen() error {
FromIP: t.pn.config.publicIP,
ConeNatPort: t.coneNatPort,
ID: t.id,
PunchTs: uint64(time.Now().UnixNano() + int64(PunchTsDelay) - t.pn.dt),
Version: OpenP2PVersion,
}
t.punchTs = rsp.PunchTs
// only private node set ipv6
if t.config.fromToken == t.pn.config.Token {
t.pn.refreshIPv6(false)
@@ -607,14 +600,7 @@ func (t *P2PTunnel) closeOverlayConns(appID uint64) {
t.overlayConns.Range(func(_, i interface{}) bool {
oConn := i.(*overlayConn)
if oConn.appID == appID {
if oConn.connTCP != nil {
oConn.connTCP.Close()
oConn.connTCP = nil
}
if oConn.connUDP != nil {
oConn.connUDP.Close()
oConn.connUDP = nil
}
oConn.Close()
}
return true
})

View File

@@ -10,9 +10,10 @@ import (
"time"
)
const OpenP2PVersion = "3.5.0"
const ProducnName string = "openp2p"
const OpenP2PVersion = "3.10.3"
const ProductName string = "openp2p"
const LeastSupportVersion = "3.0.0"
const SyncServerTimeVersion = "3.9.0"
const (
IfconfigPort1 = 27180
@@ -96,6 +97,7 @@ const (
MsgPushEditNode = 12
MsgPushAPPKey = 13
MsgPushReportLog = 14
MsgPushDstNodeOnline = 15
)
// MsgP2P sub type message
@@ -133,22 +135,26 @@ const (
const (
ReadBuffLen = 4096 // for UDP maybe not enough
NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow
TunnelHeartbeatTime = time.Second * 15
TunnelHeartbeatTime = time.Second * 10 // some nat udp session expired time less than 15s. change to 10s
TunnelIdleTimeout = time.Minute
SymmetricHandshakeNum = 800 // 0.992379
// SymmetricHandshakeNum = 1000 // 0.999510
SymmetricHandshakeInterval = time.Millisecond
SymmetricHandshakeAckTimeout = time.Second * 11
PeerAddRelayTimeount = time.Second * 20
CheckActiveTimeout = time.Second * 5
PaddingSize = 16
AESKeySize = 16
MaxRetry = 10
RetryInterval = time.Second * 30
PublicIPEchoTimeout = time.Second * 1
NatTestTimeout = time.Second * 10
ClientAPITimeout = time.Second * 10
MaxDirectTry = 3
SymmetricHandshakeInterval = time.Millisecond
HandshakeTimeout = time.Second * 5
PeerAddRelayTimeount = time.Second * 30 // peer need times
CheckActiveTimeout = time.Second * 5
PaddingSize = 16
AESKeySize = 16
MaxRetry = 10
Cone2ConePunchMaxRetry = 1
RetryInterval = time.Second * 30
PublicIPEchoTimeout = time.Second * 1
NatTestTimeout = time.Second * 5
UDPReadTimeout = time.Second * 5
ClientAPITimeout = time.Second * 10
UnderlayConnectTimeout = time.Second * 10
MaxDirectTry = 3
PunchTsDelay = time.Second * 2
)
// NATNone has public ip
@@ -223,6 +229,9 @@ type PushConnectReq struct {
LinkMode string `json:"linkMode,omitempty"`
IsUnderlayServer int `json:"isServer,omitempty"` // Requset spec peer is server
}
type PushDstNodeOnline struct {
Node string `json:"node,omitempty"`
}
type PushConnectRsp struct {
Error int `json:"error,omitempty"`
From string `json:"from,omitempty"`
@@ -235,6 +244,7 @@ type PushConnectRsp struct {
ConeNatPort int `json:"coneNatPort,omitempty"` //it's not only cone, but also upnp or nat-pmp hole
FromIP string `json:"fromIP,omitempty"`
ID uint64 `json:"id,omitempty"`
PunchTs uint64 `json:"punchts,omitempty"` // server timestamp
Version string `json:"version,omitempty"`
}
type PushRsp struct {
@@ -341,9 +351,10 @@ type AppInfo struct {
AppName string `json:"appName,omitempty"`
Error string `json:"error,omitempty"`
Protocol string `json:"protocol,omitempty"`
Whitelist string `json:"whitelist,omitempty"`
SrcPort int `json:"srcPort,omitempty"`
Protocol0 string `json:"protocol0,omitempty"`
SrcPort0 int `json:"srcPort0,omitempty"`
SrcPort0 int `json:"srcPort0,omitempty"` // srcport+protocol is uneque, use as old app id
NatType int `json:"natType,omitempty"`
PeerNode string `json:"peerNode,omitempty"`
DstPort int `json:"dstPort,omitempty"`
@@ -430,3 +441,25 @@ type QueryPeerInfoRsp struct {
IPv6 string `json:"IPv6,omitempty"` // if public relay node, ipv6 not set
HasUPNPorNATPMP int `json:"hasUPNPorNATPMP,omitempty"`
}
const rootCA = `-----BEGIN CERTIFICATE-----
MIIDhTCCAm0CFHm0cd8dnGCbUW/OcS56jf0gvRk7MA0GCSqGSIb3DQEBCwUAMH4x
CzAJBgNVBAYTAkNOMQswCQYDVQQIDAJHRDETMBEGA1UECgwKb3BlbnAycC5jbjET
MBEGA1UECwwKb3BlbnAycC5jbjETMBEGA1UEAwwKb3BlbnAycC5jbjEjMCEGCSqG
SIb3DQEJARYUb3BlbnAycC5jbkBnbWFpbC5jb20wIBcNMjMwODAxMDkwMjMwWhgP
MjEyMzA3MDgwOTAyMzBaMH4xCzAJBgNVBAYTAkNOMQswCQYDVQQIDAJHRDETMBEG
A1UECgwKb3BlbnAycC5jbjETMBEGA1UECwwKb3BlbnAycC5jbjETMBEGA1UEAwwK
b3BlbnAycC5jbjEjMCEGCSqGSIb3DQEJARYUb3BlbnAycC5jbkBnbWFpbC5jb20w
ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDWg8wPy5hBLUaY4WOXayKu
+magEz1LAY0krzXYSZaSCvGMwA0cervwAqgKfiiZEhho5UNA5iVOJ6bO1RL9H7Vp
4HuW9BttDU/NQHguD8pyqx06Kaosz5LRw8USz1BCWWFdmi8Mv4I0omtd7m6lbWnY
nrjQKLYPahPW481jUfJPqR6wUTnBuBMr2ZAGqmFR4Lhqs9B1P9GeBfDWNwVApJUC
VEhbElukRJxdUvWeJ5+HMENKQcHCTTgmQbmDLMobHXs3Xf7fT9qC76wOe9LFHI6L
dAww9gryQhxWauQl1NO8aGJTFu+3wgnKBdTMJmF/1iuZYXJOCR1solwqU1hCgBsj
AgMBAAEwDQYJKoZIhvcNAQELBQADggEBADp153YNVN8p6/3PLnXxHBDeDViAfeQd
VJmy8eH1LTq/xtUY71HGSpL7iIBNoQdDTHfsg3c6ZANBCxbO/7AhFAzPt1aK8eHy
XuEiW0Z6R8np1Khh3alCOfD15tKcjok//Wxisbz+YItlbDus/eWRbLGB3HGrzn4l
GB18jw+G7o4U3rGX8agHqVGQEd06gk1ZaprASpTGwSsv4A5ehosjT1d7re8Z5eD4
RVtXS+DplMClQ5QSlv3StwcWOsjyiAimNfLEU5xoEfq17yOJUTU1OTL4YOt16QUc
C1tnzFr3k/ioqFR7cnyzNrbjlfPOmO9l2WReEbMP3bvaSHm6EcpJKS8=
-----END CERTIFICATE-----`

View File

@@ -1,35 +0,0 @@
// Time-based One-time Password
package openp2p
import (
"crypto/hmac"
"crypto/sha256"
"encoding/binary"
)
const TOTPStep = 30 // 30s
func GenTOTP(token uint64, ts int64) uint64 {
step := ts / TOTPStep
tbuff := make([]byte, 8)
binary.LittleEndian.PutUint64(tbuff, token)
mac := hmac.New(sha256.New, tbuff)
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(step))
mac.Write(b)
num := binary.LittleEndian.Uint64(mac.Sum(nil)[:8])
// fmt.Printf("%x\n", mac.Sum(nil))
return num
}
func VerifyTOTP(code uint64, token uint64, ts int64) bool {
if code == 0 {
return false
}
if code == token {
return true
}
if code == GenTOTP(token, ts) || code == GenTOTP(token, ts-TOTPStep) || code == GenTOTP(token, ts+TOTPStep) {
return true
}
return false
}

View File

@@ -1,36 +0,0 @@
// Time-based One-time Password
package openp2p
import (
"testing"
"time"
)
func TestTOTP(t *testing.T) {
for i := 0; i < 20; i++ {
ts := time.Now().Unix()
code := GenTOTP(13666999958022769123, ts)
t.Log(code)
if !VerifyTOTP(code, 13666999958022769123, ts) {
t.Error("TOTP error")
}
if !VerifyTOTP(code, 13666999958022769123, ts-10) {
t.Error("TOTP error")
}
if !VerifyTOTP(code, 13666999958022769123, ts+10) {
t.Error("TOTP error")
}
if VerifyTOTP(code, 13666999958022769123, ts+60) {
t.Error("TOTP error")
}
if VerifyTOTP(code, 13666999958022769124, ts+1) {
t.Error("TOTP error")
}
if VerifyTOTP(code, 13666999958022769125, ts+1) {
t.Error("TOTP error")
}
time.Sleep(time.Second)
t.Log("round", i, " ", ts, " test ok")
}
}

View File

@@ -18,9 +18,9 @@ func UDPWrite(conn *net.UDPConn, dst net.Addr, mainType uint16, subType uint16,
return conn.WriteTo(msg, dst)
}
func UDPRead(conn *net.UDPConn, timeout int) (ra net.Addr, head *openP2PHeader, result []byte, len int, err error) {
func UDPRead(conn *net.UDPConn, timeout time.Duration) (ra net.Addr, head *openP2PHeader, result []byte, len int, err error) {
if timeout > 0 {
deadline := time.Now().Add(time.Millisecond * time.Duration(timeout))
deadline := time.Now().Add(timeout)
err = conn.SetReadDeadline(deadline)
if err != nil {
gLog.Println(LvERROR, "SetReadDeadline error")

View File

@@ -15,10 +15,10 @@ import (
"sync"
"time"
"github.com/lucas-clemente/quic-go"
"github.com/quic-go/quic-go"
)
//quic.DialContext do not support version 44,disable it
// quic.DialContext do not support version 44,disable it
var quicVersion []quic.VersionNumber
type underlayQUIC struct {
@@ -87,7 +87,7 @@ func (conn *underlayQUIC) CloseListener() {
}
func (conn *underlayQUIC) Accept() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), UnderlayConnectTimeout)
defer cancel()
sess, err := conn.listener.Accept(ctx)
if err != nil {

View File

@@ -67,21 +67,30 @@ func (conn *underlayTCP) Close() error {
return conn.Conn.Close()
}
func listenTCP(host string, port int, localPort int, mode string) (*underlayTCP, error) {
func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) (*underlayTCP, error) {
if mode == LinkModeTCPPunch {
c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout) // TODO: timeout
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else {
ts := time.Duration(int64(t.punchTs) + t.pn.dt - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts)
}
gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port))
c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "send tcp punch: ", err)
return nil, err
}
return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil
}
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil)
addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", localPort))
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}
l.SetDeadline(time.Now().Add(SymmetricHandshakeAckTimeout))
l.SetDeadline(time.Now().Add(HandshakeTimeout))
c, err := l.Accept()
defer l.Close()
if err != nil {
@@ -94,9 +103,9 @@ func dialTCP(host string, port int, localPort int, mode string) (*underlayTCP, e
var c net.Conn
var err error
if mode == LinkModeTCPPunch {
c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout)
c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
} else {
c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout)
c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
}
if err != nil {

View File

@@ -73,7 +73,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) {
return nil, err
}
defer l.Close()
l.SetDeadline(time.Now().Add(SymmetricHandshakeAckTimeout))
l.SetDeadline(time.Now().Add(HandshakeTimeout))
c, err := l.Accept()
defer l.Close()
if err != nil {
@@ -83,7 +83,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) {
}
func dialTCP6(host string, port int) (*underlayTCP6, error) {
c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), SymmetricHandshakeAckTimeout)
c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), HandshakeTimeout)
if err != nil {
gLog.Printf(LvERROR, "Dial %s:%d error:%s", host, port, err)
return nil, err

View File

@@ -5,6 +5,7 @@ import (
"archive/zip"
"compress/gzip"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
@@ -19,15 +20,19 @@ import (
func update(host string, port int) {
gLog.Println(LvINFO, "update start")
defer gLog.Println(LvINFO, "update end")
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(rootCA))
c := http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
TLSClientConfig: &tls.Config{RootCAs: caCertPool,
InsecureSkipVerify: false},
},
Timeout: time.Second * 30,
}
goos := runtime.GOOS
goarch := runtime.GOARCH
rsp, err := c.Get(fmt.Sprintf("https://%s:%d/api/v1/update?fromver=%s&os=%s&arch=%s", host, port, OpenP2PVersion, goos, goarch))
rsp, err := c.Get(fmt.Sprintf("https://%s:%d/api/v1/update?fromver=%s&os=%s&arch=%s&user=%s&node=%s", host, port, OpenP2PVersion, goos, goarch, gConf.Network.User, gConf.Network.Node))
if err != nil {
gLog.Println(LvERROR, "update:query update list failed:", err)
return
@@ -43,8 +48,7 @@ func update(host string, port int) {
return
}
updateInfo := UpdateInfo{}
err = json.Unmarshal(rspBuf, &updateInfo)
if err != nil {
if err = json.Unmarshal(rspBuf, &updateInfo); err != nil {
gLog.Println(LvERROR, rspBuf, " update info decode error:", err)
return
}
@@ -69,7 +73,7 @@ func updateFile(url string, checksum string, dst string) error {
return err
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
}
client := &http.Client{Transport: tr}
response, err := client.Get(url)

View File

@@ -5,6 +5,7 @@ package openp2p
import (
"bytes"
"crypto/tls"
"encoding/xml"
"errors"
"fmt"
@@ -181,7 +182,12 @@ func localIPv4() string { // TODO: multi nic will wrong
}
func getServiceURL(rootURL string) (url, urnDomain string, err error) {
r, err := http.Get(rootURL)
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Timeout: time.Second * 3}
r, err := client.Get(rootURL)
if err != nil {
return
}

View File

@@ -21,7 +21,7 @@ func setRLimit() error {
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
limit.Cur = 10240
limit.Cur = 65536
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}

69
core/util_freebsd.go Normal file
View File

@@ -0,0 +1,69 @@
package openp2p
import (
"bufio"
"bytes"
"io/ioutil"
"os"
"runtime"
"strings"
"syscall"
)
const (
defaultInstallPath = "/usr/local/openp2p"
defaultBinName = "openp2p"
)
func getOsName() (osName string) {
var sysnamePath string
sysnamePath = "/etc/redhat-release"
_, err := os.Stat(sysnamePath)
if err != nil && os.IsNotExist(err) {
str := "PRETTY_NAME="
f, err := os.Open("/etc/os-release")
if err == nil {
buf := bufio.NewReader(f)
for {
line, err := buf.ReadString('\n')
if err == nil {
line = strings.TrimSpace(line)
pos := strings.Count(line, str)
if pos > 0 {
len1 := len([]rune(str)) + 1
rs := []rune(line)
osName = string(rs[len1 : (len(rs))-1])
break
}
} else {
break
}
}
}
} else {
buff, err := ioutil.ReadFile(sysnamePath)
if err == nil {
osName = string(bytes.TrimSpace(buff))
}
}
if osName == "" {
osName = "FreeBSD"
}
return
}
func setRLimit() error {
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
limit.Max = 65536
limit.Cur = limit.Max
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
return nil
}
func setFirewall() {
}

View File

@@ -64,7 +64,7 @@ func setRLimit() error {
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
limit.Max = 1024 * 1024
limit.Max = 65536
limit.Cur = limit.Max
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err

View File

@@ -45,9 +45,9 @@ func setFirewall() {
}
if isXP {
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall del allowedprogram "%s"`, fullPath)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall add allowedprogram "%s" "%s" ENABLE`, ProducnName, fullPath)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall add allowedprogram "%s" "%s" ENABLE`, ProductName, fullPath)).Run()
} else { // win7 or later
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall del rule name="%s"`, ProducnName)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall add rule name="%s" dir=in action=allow program="%s" enable=yes`, ProducnName, fullPath)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall del rule name="%s"`, ProductName)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall add rule name="%s" dir=in action=allow program="%s" enable=yes`, ProductName, fullPath)).Run()
}
}

12
docker/Dockerfile Executable file
View File

@@ -0,0 +1,12 @@
FROM alpine:3.18.2
# Replace the default Alpine repositories with Aliyun mirrors
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories && \
apk add --no-cache ca-certificates && \
rm -rf /tmp/* /var/tmp/* /var/cache/apk/* /var/cache/distfiles/*
COPY get-client.sh /
RUN echo $TARGETPLATFORM && chmod +x /get-client.sh && ./get-client.sh
ENTRYPOINT ["/openp2p"]

43
docker/get-client.sh Executable file
View File

@@ -0,0 +1,43 @@
#!/bin/sh
echo "Running on platform: $TARGETPLATFORM"
# TARGETPLATFORM=$(echo $TARGETPLATFORM | tr ',' '/')
echo "Running on platform: $TARGETPLATFORM"
sysType="linux-amd64"
archType=$(uname -m)
if [[ $archType == aarch64 ]] ;
then
sysType="linux-arm64"
elif [[ $archType == arm* ]] ;
then
sysType="linux-arm"
elif [[ $archType == i*86 ]] ;
then
sysType="linux-386"
elif [[ $archType == mips ]] ;
then
sysType="linux-mipsle"
ls /lib |grep mipsel
if [[ $? -ne 0 ]]; then
# mipsel not found, it's mipseb
sysType="linux-mipsbe"
fi
fi
url="https://openp2p.cn/download/v1/latest/openp2p-latest.$sysType.tar.gz"
echo "download $url start"
if [ -f /usr/bin/curl ]; then
curl -k -o openp2p.tar.gz $url
else
wget --no-check-certificate -O openp2p.tar.gz $url
fi
if [ $? -ne 0 ]; then
echo "download error $?"
exit 9
fi
echo "download ok"
tar -xzvf openp2p.tar.gz
chmod +x openp2p
pwd
ls -l
exit 0

33
go.mod
View File

@@ -3,26 +3,27 @@ module openp2p
go 1.18
require (
github.com/emirpasic/gods v1.18.1
github.com/gorilla/websocket v1.4.2
github.com/kardianos/service v1.2.0
github.com/lucas-clemente/quic-go v0.27.0
github.com/openp2p-cn/go-reuseport v0.3.2
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f
github.com/openp2p-cn/service v1.0.0
github.com/openp2p-cn/totp v0.0.0-20230102121327-8e02f6b392ed
github.com/quic-go/quic-go v0.34.0
golang.org/x/sys v0.5.0
)
require (
github.com/cheekybits/genny v1.0.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.1 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/mobile v0.0.0-20221020085226-b36e6246172e // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/tools v0.1.12 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/kardianos/service v1.2.2 // indirect
github.com/onsi/ginkgo/v2 v2.2.0 // indirect
github.com/quic-go/qtls-go1-19 v0.3.2 // indirect
github.com/quic-go/qtls-go1-20 v0.2.2 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/tools v0.2.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)