Compare commits

..

4 Commits

Author SHA1 Message Date
TenderIronh
8ebdf3341e 3.9.1 2023-07-21 22:25:33 +08:00
TenderIronh
b667e5b766 3.8.0 2023-05-07 20:42:06 +08:00
TenderIronh
cd415e7bf4 3.6.11 2023-03-25 12:00:27 +08:00
TenderIronh
67e3a8915a 3.6.8 2023-03-22 23:11:38 +08:00
28 changed files with 486 additions and 259 deletions

View File

@@ -101,6 +101,22 @@ cd到代码根目录执行
```
make
```
手动编译特定系统和架构
All GOOS values:
```
"aix", "android", "darwin", "dragonfly", "freebsd", "hurd", "illumos", "ios", "js", "linux", "nacl", "netbsd", "openbsd", "plan9", "solaris", "windows", "zos"
```
All GOARCH values:
```
"386", "amd64", "amd64p32", "arm", "arm64", "arm64be", "armbe", "loong64", "mips", "mips64", "mips64le", "mips64p32", "mips64p32le", "mipsle", "ppc", "ppc64", "ppc64le", "riscv", "riscv64", "s390", "s390x", "sparc", "sparc64", "wasm"
```
比如linux+amd64
```
export GOPROXY=https://goproxy.io,direct
go mod tidy
CGO_ENABLED=0 env GOOS=linux GOARCH=amd64 go build -o openp2p --ldflags '-s -w ' -gcflags '-l' -p 8 -installsuffix cgo ./cmd
```
## RoadMap
近期计划:
@@ -110,12 +126,12 @@ make
4. ~~建立网站用户可以在网站管理所有P2PApp和设备。查看设备在线状态升级增删查改重启P2PApp等~~(100%)
5. 建立公众号用户可在微信公众号管理所有P2PApp和设备
6. 客户端提供WebUI
7. 支持自有服务器,开源服务器程序
7. ~~支持自有服务器,开源服务器程序~~(100%)
8. 共享节点调度模型优化,对不同的运营商优化
9. 方便二次开发提供API和lib
10. 应用层支持UDP协议实现很简单但UDP应用较少暂不急(100%)
10. ~~应用层支持UDP协议实现很简单但UDP应用较少暂不急~~(100%)
11. 底层通信支持KCP协议目前仅支持QuicKCP专门对延时优化被游戏加速器广泛使用可以牺牲一定的带宽降低延时
12. 支持Android系统让旧手机焕发青春变成移动网关
12. ~~支持Android系统让旧手机焕发青春变成移动网关~~(100%)
13. 支持Windows网上邻居共享文件
14. 内网直连优化,用处不大,估计就用户测试时用到
15. ~~支持UPNP~~(100%)

View File

@@ -109,6 +109,23 @@ cd root directory of the socure code and execute
make
```
build specified os and arch.
All GOOS values:
```
"aix", "android", "darwin", "dragonfly", "freebsd", "hurd", "illumos", "ios", "js", "linux", "nacl", "netbsd", "openbsd", "plan9", "solaris", "windows", "zos"
```
All GOARCH values:
```
"386", "amd64", "amd64p32", "arm", "arm64", "arm64be", "armbe", "loong64", "mips", "mips64", "mips64le", "mips64p32", "mips64p32le", "mipsle", "ppc", "ppc64", "ppc64le", "riscv", "riscv64", "s390", "s390x", "sparc", "sparc64", "wasm"
```
For example linux+amd64
```
export GOPROXY=https://goproxy.io,direct
go mod tidy
CGO_ENABLED=0 env GOOS=linux GOARCH=amd64 go build -o openp2p --ldflags '-s -w ' -gcflags '-l' -p 8 -installsuffix cgo ./cmd
```
## RoadMap
Short-Term:
1. ~~Support IPv6.~~(100%)
@@ -117,12 +134,12 @@ Short-Term:
4. ~~Build website, users can manage all P2PApp and devices via it. View devices' online status, upgrade, restart or CURD P2PApp .~~(100%)
5. Provide wechat official account, user can manage P2PApp nodes and deivce as same as website.
6. Provide WebUI on client side.
7. Support private server, open source server program.
7. ~~Support private server, open source server program.~~(100%)
8. Optimize our share scheduling model for different network operators.
9. Provide REST APIs and libary for secondary development.
10. ~~Support UDP at application layer, it is easy to implement but not urgent due to only a few applicaitons using UDP protocol.~~(100%)
11. Support KCP protocol underlay, currently support Quic only. KCP focus on delay optimization,which has been widely used as game accelerator,it can sacrifice part of bandwidth to reduce timelag.
12. Support Android platform, let the phones to be mobile gateway.
12. ~~Support Android platform, let the phones to be mobile gateway.~~(100%)
13. Support SMB Windows neighborhood.
14. Direct connection on intranet, for testing.
15. ~~Support UPNP.~~(100%)

View File

@@ -96,4 +96,9 @@ firewall-cmd --state
C:\Program Files\OpenP2P\openp2p.exe uninstall
# linux,macos
sudo /usr/local/openp2p/openp2p uninstall
```
```
## Docker运行
```
docker run -d --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME
```

View File

@@ -98,4 +98,9 @@ firewall-cmd --state
C:\Program Files\OpenP2P\openp2p.exe uninstall
# linux,macos
sudo /usr/local/openp2p/openp2p uninstall
```
## Run with Docker
```
docker run -d --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME
```

View File

@@ -1,7 +1,9 @@
package main
import openp2p "openp2p/core"
import (
core "openp2p/core"
)
func main() {
openp2p.Run()
core.Run()
}

View File

@@ -58,7 +58,18 @@ func (c *Config) switchApp(app AppConfig, enabled int) {
c.Apps[i].Enabled = enabled
c.Apps[i].retryNum = 0
c.Apps[i].nextRetryTime = time.Now()
return
break
}
}
c.save()
}
func (c *Config) retryApp(peerNode string) {
c.mtx.Lock()
defer c.mtx.Unlock()
for i := 0; i < len(c.Apps); i++ {
if c.Apps[i].PeerNode == peerNode {
c.Apps[i].retryNum = 0
c.Apps[i].nextRetryTime = time.Now()
}
}
}
@@ -66,6 +77,7 @@ func (c *Config) switchApp(app AppConfig, enabled int) {
func (c *Config) add(app AppConfig, override bool) {
c.mtx.Lock()
defer c.mtx.Unlock()
defer c.save()
if app.SrcPort == 0 || app.DstPort == 0 {
gLog.Println(LvERROR, "invalid app ", app)
return
@@ -87,17 +99,19 @@ func (c *Config) delete(app AppConfig) {
}
c.mtx.Lock()
defer c.mtx.Unlock()
defer c.save()
for i := 0; i < len(c.Apps); i++ {
if c.Apps[i].Protocol == app.Protocol && c.Apps[i].SrcPort == app.SrcPort {
c.Apps = append(c.Apps[:i], c.Apps[i+1:]...)
return
}
}
}
func (c *Config) save() {
c.mtx.Lock()
defer c.mtx.Unlock()
// c.mtx.Lock()
// defer c.mtx.Unlock() // internal call
data, _ := json.MarshalIndent(c, "", " ")
err := ioutil.WriteFile("config.json", data, 0644)
if err != nil {
@@ -132,6 +146,7 @@ func (c *Config) load() error {
func (c *Config) setToken(token uint64) {
c.mtx.Lock()
defer c.mtx.Unlock()
defer c.save()
if token != 0 {
c.Network.Token = token
}
@@ -139,16 +154,19 @@ func (c *Config) setToken(token uint64) {
func (c *Config) setUser(user string) {
c.mtx.Lock()
defer c.mtx.Unlock()
defer c.save()
c.Network.User = user
}
func (c *Config) setNode(node string) {
c.mtx.Lock()
defer c.mtx.Unlock()
defer c.save()
c.Network.Node = node
}
func (c *Config) setShareBandwidth(bw int) {
c.mtx.Lock()
defer c.mtx.Unlock()
defer c.save()
c.Network.ShareBandwidth = bw
}

View File

@@ -8,12 +8,14 @@ import (
var (
// ErrorS2S string = "s2s is not supported"
// ErrorHandshake string = "handshake error"
ErrorS2S = errors.New("s2s is not supported")
ErrorHandshake = errors.New("handshake error")
ErrorNewUser = errors.New("new user")
ErrorLogin = errors.New("user or password not correct")
ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters")
ErrPeerOffline = errors.New("peer offline")
ErrMsgFormat = errors.New("message format wrong")
ErrVersionNotCompatible = errors.New("version not compatible")
ErrorS2S = errors.New("s2s is not supported")
ErrorHandshake = errors.New("handshake error")
ErrorNewUser = errors.New("new user")
ErrorLogin = errors.New("user or password not correct")
ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters")
ErrPeerOffline = errors.New("peer offline")
ErrNetwork = errors.New("network error")
ErrMsgFormat = errors.New("message format wrong")
ErrVersionNotCompatible = errors.New("version not compatible")
ErrOverlayConnDisconnect = errors.New("overlay connection is disconnected")
)

View File

@@ -28,7 +28,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
gLog.Printf(LvERROR, "wrong MsgPushConnectReq:%s", err)
return err
}
gLog.Printf(LvINFO, "%s is connecting...", req.From)
gLog.Printf(LvDEBUG, "%s is connecting...", req.From)
gLog.Println(LvDEBUG, "push connect response to ", req.From)
if compareVersion(req.Version, LeastSupportVersion) == LESS {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
@@ -43,8 +43,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}
// verify totp token or token
t := totp.TOTP{Step: totp.RelayTOTPStep}
if t.Verify(req.Token, pn.config.Token, time.Now().Unix()+(pn.serverTs-pn.localTs)) || // localTs may behind, auto adjust ts
t.Verify(req.Token, pn.config.Token, time.Now().Unix()) {
if t.Verify(req.Token, pn.config.Token, time.Now().Unix()-pn.dt) { // localTs may behind, auto adjust ts
gLog.Printf(LvINFO, "Access Granted\n")
config := AppConfig{}
config.peerNatType = req.NatType
@@ -176,7 +175,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}
pn.write(MsgReport, MsgReportApps, &req)
case MsgPushReportLog:
gLog.Println(LvINFO, "MsgPushReportLog")
gLog.Println(LvDEBUG, "MsgPushReportLog")
req := ReportLogReq{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &req)
if err != nil {
@@ -241,7 +240,6 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
newConf.Protocol = newApp.Protocol
newConf.SrcPort = newApp.SrcPort
gConf.add(newConf, false)
gConf.save() // save quickly for the next request reportApplist
pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end
// autoReconnect will auto AddApp
// pn.AddApp(config)
@@ -256,7 +254,6 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}
gConf.setNode(req.NewName)
gConf.setShareBandwidth(req.Bandwidth)
gConf.save()
// TODO: hot reload
os.Exit(0)
case MsgPushSwitchApp:
@@ -274,6 +271,16 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
// disable APP
pn.DeleteApp(config)
}
case MsgPushDstNodeOnline:
gLog.Println(LvINFO, "MsgPushDstNodeOnline")
app := PushDstNodeOnline{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &app)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushDstNodeOnline:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
gLog.Println(LvINFO, "retry peerNode ", app.Node)
gConf.retryApp(app.Node)
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[pushHead.From]

View File

@@ -23,11 +23,11 @@ func handshakeC2C(t *P2PTunnel) (err error) {
gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshake error:", err)
return err
}
ra, head, _, _, err := UDPRead(conn, 5000)
ra, head, _, _, err := UDPRead(conn, HandshakeTimeout)
if err != nil {
time.Sleep(time.Millisecond * 200)
gLog.Println(LvDEBUG, err, ", return this error when ip was not reachable, retry read")
ra, head, _, _, err = UDPRead(conn, 5000)
ra, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err)
return err
@@ -38,7 +38,7 @@ func handshakeC2C(t *P2PTunnel) (err error) {
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
gLog.Printf(LvDEBUG, "read %d handshake ", t.id)
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
_, head, _, _, err = UDPRead(conn, 5000)
_, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err)
return err
@@ -66,7 +66,7 @@ func handshakeC2S(t *P2PTunnel) error {
gLog.Printf(LvDEBUG, "handshakeC2S start")
defer gLog.Printf(LvDEBUG, "handshakeC2S end")
// even if read timeout, continue handshake
t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, SymmetricHandshakeAckTimeout)
t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, HandshakeTimeout)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randPorts := r.Perm(65532)
conn, err := net.ListenUDP("udp", t.la)
@@ -92,7 +92,7 @@ func handshakeC2S(t *P2PTunnel) error {
gLog.Println(LvDEBUG, "send symmetric handshake end")
return nil
}()
deadline := time.Now().Add(SymmetricHandshakeAckTimeout)
deadline := time.Now().Add(HandshakeTimeout)
err = conn.SetReadDeadline(deadline)
if err != nil {
gLog.Println(LvERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error")
@@ -140,7 +140,7 @@ func handshakeS2C(t *P2PTunnel) error {
}
defer conn.Close()
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id})
_, head, _, _, err := UDPRead(conn, 10000)
_, head, _, _, err := UDPRead(conn, HandshakeTimeout)
if err != nil {
// gLog.Println(LevelDEBUG, "one of the handshake error:", err)
return err
@@ -155,7 +155,7 @@ func handshakeS2C(t *P2PTunnel) error {
if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake {
gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ", t.id)
UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id})
_, head, _, _, err = UDPRead(conn, 5000)
_, head, _, _, err = UDPRead(conn, HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "handshakeS2C handshake error")
return err
@@ -174,7 +174,7 @@ func handshakeS2C(t *P2PTunnel) error {
t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id})
select {
case <-time.After(SymmetricHandshakeAckTimeout):
case <-time.After(HandshakeTimeout):
return fmt.Errorf("wait handshake failed")
case la := <-gotCh:
gLog.Println(LvDEBUG, "symmetric handshake ok", la)

View File

@@ -51,6 +51,7 @@ type logger struct {
pid int
maxLogSize int64
mode int
stdLogger *log.Logger
}
func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64, mode int) *logger {
@@ -73,7 +74,7 @@ func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64,
}
os.Chmod(logFilePath, 0644)
logfiles[lv] = f
loggers[lv] = log.New(f, "", log.LstdFlags)
loggers[lv] = log.New(f, "", log.LstdFlags|log.Lmicroseconds)
}
var le string
if runtime.GOOS == "windows" {
@@ -81,7 +82,8 @@ func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64,
} else {
le = "\n"
}
pLog := &logger{loggers, logfiles, level, logdir, &sync.Mutex{}, le, os.Getpid(), maxLogSize, mode}
pLog := &logger{loggers, logfiles, level, logdir, &sync.Mutex{}, le, os.Getpid(), maxLogSize, mode, log.New(os.Stdout, "", 0)}
pLog.stdLogger.SetFlags(log.LstdFlags | log.Lmicroseconds)
go pLog.checkFile()
return pLog
}
@@ -142,7 +144,7 @@ func (l *logger) Printf(level LogLevel, format string, params ...interface{}) {
l.loggers[0].Printf("%d %s "+format+l.lineEnding, params...)
}
if l.mode == LogConsole || l.mode == LogFileAndConsole {
log.Printf("%d %s "+format+l.lineEnding, params...)
l.stdLogger.Printf("%d %s "+format+l.lineEnding, params...)
}
}
@@ -159,6 +161,6 @@ func (l *logger) Println(level LogLevel, params ...interface{}) {
l.loggers[0].Print(params...)
}
if l.mode == LogConsole || l.mode == LogFileAndConsole {
log.Print(params...)
l.stdLogger.Print(params...)
}
}

View File

@@ -3,7 +3,6 @@ package openp2p
import (
"encoding/json"
"fmt"
"log"
"math/rand"
"net"
"strconv"
@@ -14,27 +13,29 @@ import (
reuse "github.com/openp2p-cn/go-reuseport"
)
func natTCP(serverHost string, serverPort int, localPort int) (publicIP string, publicPort int) {
func natTCP(serverHost string, serverPort int) (publicIP string, publicPort int, localPort int) {
// dialer := &net.Dialer{
// LocalAddr: &net.TCPAddr{
// IP: net.ParseIP("0.0.0.0"),
// Port: localPort,
// },
// }
conn, err := reuse.DialTimeout("tcp4", fmt.Sprintf("%s:%d", "0.0.0.0", localPort), fmt.Sprintf("%s:%d", serverHost, serverPort), time.Second*5)
conn, err := reuse.DialTimeout("tcp4", fmt.Sprintf("%s:%d", "0.0.0.0", 0), fmt.Sprintf("%s:%d", serverHost, serverPort), NatTestTimeout)
// conn, err := net.Dial("tcp4", fmt.Sprintf("%s:%d", serverHost, serverPort))
// log.Println(LvINFO, conn.LocalAddr())
if err != nil {
fmt.Printf("Dial tcp4 %s:%d error:%s", serverHost, serverPort, err)
return
}
defer conn.Close()
localPort, _ = strconv.Atoi(strings.Split(conn.LocalAddr().String(), ":")[1])
_, wrerr := conn.Write([]byte("1"))
if wrerr != nil {
fmt.Printf("Write error: %s\n", wrerr)
return
}
b := make([]byte, 1000)
conn.SetReadDeadline(time.Now().Add(time.Second * 5))
conn.SetReadDeadline(time.Now().Add(NatTestTimeout))
n, rderr := conn.Read(b)
if rderr != nil {
fmt.Printf("Read error: %s\n", rderr)
@@ -91,7 +92,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string,
func getNATType(host string, udp1 int, udp2 int) (publicIP string, NATType int, hasIPvr int, hasUPNPorNATPMP int, err error) {
// the random local port may be used by other.
localPort := int(rand.Uint32()%15000 + 50000)
echoPort := P2PNetworkInstance(nil).config.TCPPort
echoPort := gConf.Network.TCPPort
ip1, port1, err := natTest(host, udp1, localPort)
if err != nil {
return "", 0, 0, 0, err
@@ -151,14 +152,14 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP
gLog.Println(LvDEBUG, "could not perform UPNP external address:", err)
break
}
log.Println("PublicIP:", ext)
gLog.Println(LvINFO, "PublicIP:", ext)
externalPort, err := nat.AddPortMapping("udp", echoPort, echoPort, "openp2p", 30)
externalPort, err := nat.AddPortMapping("udp", echoPort, echoPort, "openp2p", 30) // 30 seconds fot upnp testing
if err != nil {
gLog.Println(LvDEBUG, "could not add udp UPNP port mapping", externalPort)
break
} else {
nat.AddPortMapping("tcp", echoPort, echoPort, "openp2p", 604800)
nat.AddPortMapping("tcp", echoPort, echoPort, "openp2p", 604800) // 7 days for tcp connection
}
}
gLog.Printf(LvDEBUG, "public ip test start %s:%d", publicIP, echoPort)

View File

@@ -35,24 +35,23 @@ type overlayConn struct {
// for udp
connUDP *net.UDPConn
remoteAddr net.Addr
udpRelayData chan []byte
udpData chan []byte
lastReadUDPTs time.Time
}
func (oConn *overlayConn) run() {
gLog.Printf(LvDEBUG, "%d overlayConn run start", oConn.id)
defer gLog.Printf(LvDEBUG, "%d overlayConn run end", oConn.id)
oConn.running = true
oConn.lastReadUDPTs = time.Now()
buffer := make([]byte, ReadBuffLen+PaddingSize)
readBuf := buffer[:ReadBuffLen]
buffer := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
reuseBuff := buffer[:ReadBuffLen]
encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
tunnelHead := new(bytes.Buffer)
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, oConn.rtid)
binary.Write(tunnelHead, binary.LittleEndian, oConn.id)
for oConn.running && oConn.tunnel.isRuning() {
buff, dataLen, err := oConn.Read(readBuf)
readBuff, dataLen, err := oConn.Read(reuseBuff)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
continue
@@ -61,9 +60,9 @@ func (oConn *overlayConn) run() {
gLog.Printf(LvDEBUG, "overlayConn %d read error:%s,close it", oConn.id, err)
break
}
payload := buff[:dataLen]
payload := readBuff[:dataLen]
if oConn.appKey != 0 {
payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, buffer[:dataLen], dataLen)
payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, readBuff[:dataLen], dataLen)
}
writeBytes := append(tunnelHead.Bytes(), payload...)
if oConn.rtid == 0 {
@@ -85,20 +84,22 @@ func (oConn *overlayConn) run() {
}
oConn.tunnel.overlayConns.Delete(oConn.id)
// notify peer disconnect
if oConn.isClient {
req := OverlayDisconnectReq{ID: oConn.id}
if oConn.rtid == 0 {
oConn.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
} else {
// write relay data
msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
req := OverlayDisconnectReq{ID: oConn.id}
if oConn.rtid == 0 {
oConn.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
} else {
// write relay data
msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
}
func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error) {
func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, dataLen int, err error) {
if !oConn.running {
err = ErrOverlayConnDisconnect
return
}
if oConn.connUDP != nil {
if time.Now().After(oConn.lastReadUDPTs.Add(time.Minute * 5)) {
err = errors.New("udp close")
@@ -106,15 +107,15 @@ func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error)
}
if oConn.remoteAddr != nil { // as server
select {
case buff = <-oConn.udpRelayData:
n = len(buff)
case buff = <-oConn.udpData:
dataLen = len(buff) - PaddingSize
oConn.lastReadUDPTs = time.Now()
case <-time.After(time.Second * 10):
err = ErrDeadlineExceeded
}
} else { // as client
oConn.connUDP.SetReadDeadline(time.Now().Add(5 * time.Second))
n, _, err = oConn.connUDP.ReadFrom(reuseBuff)
oConn.connUDP.SetReadDeadline(time.Now().Add(UDPReadTimeout))
dataLen, _, err = oConn.connUDP.ReadFrom(reuseBuff)
if err == nil {
oConn.lastReadUDPTs = time.Now()
}
@@ -122,15 +123,21 @@ func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error)
}
return
}
oConn.connTCP.SetReadDeadline(time.Now().Add(time.Second * 5))
n, err = oConn.connTCP.Read(reuseBuff)
buff = reuseBuff
if oConn.connTCP != nil {
oConn.connTCP.SetReadDeadline(time.Now().Add(UDPReadTimeout))
dataLen, err = oConn.connTCP.Read(reuseBuff)
buff = reuseBuff
}
return
}
// calling by p2pTunnel
func (oConn *overlayConn) Write(buff []byte) (n int, err error) {
// add mutex when multi-thread calling
if !oConn.running {
return 0, ErrOverlayConnDisconnect
}
if oConn.connUDP != nil {
if oConn.remoteAddr == nil {
n, err = oConn.connUDP.Write(buff)
@@ -142,9 +149,25 @@ func (oConn *overlayConn) Write(buff []byte) (n int, err error) {
}
return
}
n, err = oConn.connTCP.Write(buff)
if oConn.connTCP != nil {
n, err = oConn.connTCP.Write(buff)
}
if err != nil {
oConn.running = false
}
return
}
func (oConn *overlayConn) Close() (err error) {
oConn.running = false
if oConn.connTCP != nil {
oConn.connTCP.Close()
oConn.connTCP = nil
}
if oConn.connUDP != nil {
oConn.connUDP.Close()
oConn.connUDP = nil
}
return nil
}

View File

@@ -72,6 +72,7 @@ func (app *p2pApp) listenTCP() error {
rtid: app.rtid,
appID: app.id,
appKey: app.key,
running: true,
}
// pre-calc key bytes for encrypt
if oConn.appKey != 0 {
@@ -81,7 +82,7 @@ func (app *p2pApp) listenTCP() error {
oConn.appKeyBytes = encryptKey
}
app.tunnel.overlayConns.Store(oConn.id, &oConn)
gLog.Printf(LvDEBUG, "Accept TCP overlayID:%d", oConn.id)
gLog.Printf(LvDEBUG, "Accept TCP overlayID:%d, %s", oConn.id, oConn.connTCP.RemoteAddr())
// tell peer connect
req := OverlayConnectReq{ID: oConn.id,
Token: app.tunnel.pn.config.Token,
@@ -100,6 +101,8 @@ func (app *p2pApp) listenTCP() error {
msgWithHead := append(relayHead.Bytes(), msg...)
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
// TODO: wait OverlayConnectRsp instead of sleep
time.Sleep(time.Second) // waiting remote node connection ok
go oConn.run()
}
return nil
@@ -114,10 +117,10 @@ func (app *p2pApp) listenUDP() error {
gLog.Printf(LvERROR, "listen error:%s", err)
return err
}
buffer := make([]byte, 64*1024)
buffer := make([]byte, 64*1024+PaddingSize)
udpID := make([]byte, 8)
for {
app.listenerUDP.SetReadDeadline(time.Now().Add(time.Second * 10))
app.listenerUDP.SetReadDeadline(time.Now().Add(UDPReadTimeout))
len, remoteAddr, err := app.listenerUDP.ReadFrom(buffer)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
@@ -127,8 +130,8 @@ func (app *p2pApp) listenUDP() error {
break
}
} else {
b := bytes.Buffer{}
b.Write(buffer[:len])
dupData := bytes.Buffer{} // should uses memory pool
dupData.Write(buffer[:len+PaddingSize])
// load from app.tunnel.overlayConns by remoteAddr error, new udp connection
remoteIP := strings.Split(remoteAddr.String(), ":")[0]
port, _ := strconv.Atoi(strings.Split(remoteAddr.String(), ":")[1])
@@ -139,19 +142,20 @@ func (app *p2pApp) listenUDP() error {
udpID[3] = a[3]
udpID[4] = byte(port)
udpID[5] = byte(port >> 8)
id := binary.LittleEndian.Uint64(udpID)
id := binary.LittleEndian.Uint64(udpID) // convert remoteIP:port to uint64
s, ok := app.tunnel.overlayConns.Load(id)
if !ok {
oConn := overlayConn{
tunnel: app.tunnel,
connUDP: app.listenerUDP,
remoteAddr: remoteAddr,
udpRelayData: make(chan []byte, 1000),
id: id,
isClient: true,
rtid: app.rtid,
appID: app.id,
appKey: app.key,
tunnel: app.tunnel,
connUDP: app.listenerUDP,
remoteAddr: remoteAddr,
udpData: make(chan []byte, 1000),
id: id,
isClient: true,
rtid: app.rtid,
appID: app.id,
appKey: app.key,
running: true,
}
// calc key bytes for encrypt
if oConn.appKey != 0 {
@@ -180,8 +184,10 @@ func (app *p2pApp) listenUDP() error {
msgWithHead := append(relayHead.Bytes(), msg...)
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
// TODO: wait OverlayConnectRsp instead of sleep
time.Sleep(time.Second) // waiting remote node connection ok
go oConn.run()
oConn.udpRelayData <- b.Bytes()
oConn.udpData <- dupData.Bytes()
}
// load from app.tunnel.overlayConns by remoteAddr ok, write relay data
@@ -189,7 +195,7 @@ func (app *p2pApp) listenUDP() error {
if !ok {
continue
}
overlayConn.udpRelayData <- b.Bytes()
overlayConn.udpData <- dupData.Bytes()
}
}
return nil

View File

@@ -22,6 +22,13 @@ var (
once sync.Once
)
const (
retryLimit = 20
retryInterval = 10 * time.Second
dtma = 20
ddtma = 5
)
type P2PNetwork struct {
conn *websocket.Conn
online bool
@@ -29,9 +36,12 @@ type P2PNetwork struct {
restartCh chan bool
wgReconnect sync.WaitGroup
writeMtx sync.Mutex
serverTs int64
localTs int64
hbTime time.Time
// for sync server time
t1 int64 // nanoSeconds
dt int64 // client faster then server dt nanoSeconds
dtma int64
ddt int64 // differential of dt
// msgMap sync.Map
msgMap map[uint64]chan []byte //key: nodeID
msgMapMtx sync.Mutex
@@ -50,6 +60,8 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
running: true,
msgMap: make(map[uint64]chan []byte),
limiter: newBandwidthLimiter(config.ShareBandwidth),
dt: 0,
ddt: 0,
}
instance.msgMap[0] = make(chan []byte) // for gateway
if config != nil {
@@ -63,16 +75,18 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
}
func (pn *P2PNetwork) run() {
go pn.autorunApp()
heartbeatTimer := time.NewTicker(NetworkHeartbeatTime)
pn.t1 = time.Now().UnixNano()
pn.write(MsgHeartbeat, 0, "")
for pn.running {
select {
case <-heartbeatTimer.C:
pn.t1 = time.Now().UnixNano()
pn.write(MsgHeartbeat, 0, "")
case <-pn.restartCh:
pn.online = false
pn.wgReconnect.Wait() // wait read/write goroutine end
pn.wgReconnect.Wait() // wait read/autorunapp goroutine end
time.Sleep(ClientAPITimeout)
err := pn.init()
if err != nil {
gLog.Println(LvERROR, "P2PNetwork init error:", err)
@@ -82,7 +96,7 @@ func (pn *P2PNetwork) run() {
}
func (pn *P2PNetwork) Connect(timeout int) bool {
// waiting for login response
// waiting for heartbeat
for i := 0; i < (timeout / 1000); i++ {
if pn.hbTime.After(time.Now().Add(-NetworkHeartbeatTime)) {
return true
@@ -119,37 +133,39 @@ func (pn *P2PNetwork) runAll() {
if appExist {
pn.DeleteApp(*config)
}
if config.retryNum > 0 {
if config.retryNum >= retryLimit {
continue
}
if config.retryNum > 0 { // first time not show reconnect log
gLog.Printf(LvINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum)
if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // normal lasts 15min
if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // run normally 15min, reset retrynum
config.retryNum = 0
}
}
config.retryNum++
config.retryTime = time.Now()
if config.retryNum > 20 {
config.Enabled = 0
gLog.Printf(LvWARN, "app %s has stopped retry, manually enable it on Web console", config.AppName)
continue
}
config.nextRetryTime = time.Now().Add(time.Second * 10)
config.nextRetryTime = time.Now().Add(retryInterval)
config.connectTime = time.Now()
config.peerToken = pn.config.Token
gConf.mtx.Unlock() // AddApp will take a period of time
gConf.mtx.Unlock() // AddApp will take a period of time, let outside modify gConf
err := pn.AddApp(*config)
gConf.mtx.Lock()
if err != nil {
config.errMsg = err.Error()
if err == ErrPeerOffline { // stop retry, waiting for online
config.retryNum = retryLimit
gLog.Printf(LvINFO, " %s offline, it will auto reconnect when peer node online", config.PeerNode)
}
}
}
}
func (pn *P2PNetwork) autorunApp() {
gLog.Println(LvINFO, "autorunApp start")
for pn.running {
pn.wgReconnect.Add(1)
defer pn.wgReconnect.Done()
for pn.running && pn.online {
time.Sleep(time.Second)
if !pn.online {
continue
}
pn.runAll()
}
gLog.Println(LvINFO, "autorunApp end")
@@ -160,7 +176,7 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri
defer gLog.Printf(LvINFO, "addRelayTunnel to %s end", config.PeerNode)
// request a relay node or specify manually(TODO)
pn.write(MsgRelay, MsgRelayNodeReq, &RelayNodeReq{config.PeerNode})
head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, time.Second*10)
head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, ClientAPITimeout)
if head == nil {
return nil, 0, "", errors.New("read MsgRelayNodeRsp error")
}
@@ -364,6 +380,7 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
initErr := t.requestPeerInfo()
if initErr != nil {
gLog.Println(LvERROR, "init error:", initErr)
return nil, initErr
}
var err error
@@ -400,6 +417,7 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
t.config.linkMode = LinkModeTCPPunch
t.config.isUnderlayServer = 0
if err = pn.newTunnel(t, tid, isClient); err == nil {
gLog.Println(LvINFO, "TCP4 Punch ok")
return t, nil
}
}
@@ -436,10 +454,7 @@ func (pn *P2PNetwork) newTunnel(t *P2PTunnel, tid uint64, isClient bool) error {
func (pn *P2PNetwork) init() error {
gLog.Println(LvINFO, "init start")
pn.wgReconnect.Add(1)
go func() { //reconnect at least 5s
time.Sleep(NatTestTimeout)
pn.wgReconnect.Done()
}()
defer pn.wgReconnect.Done()
var err error
for {
// detect nat type
@@ -449,12 +464,14 @@ func (pn *P2PNetwork) init() error {
pn.config.natType = NATSymmetric
pn.config.hasIPv4 = 0
pn.config.hasUPNPorNATPMP = 0
gLog.Println(LvINFO, "openp2pS2STest debug")
}
if strings.Contains(pn.config.Node, "openp2pC2CTest") {
pn.config.natType = NATCone
pn.config.hasIPv4 = 0
pn.config.hasUPNPorNATPMP = 0
gLog.Println(LvINFO, "openp2pC2CTest debug")
}
if err != nil {
gLog.Println(LvDEBUG, "detect NAT type error:", err)
@@ -462,7 +479,7 @@ func (pn *P2PNetwork) init() error {
}
gLog.Println(LvDEBUG, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP)
gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort)
uri := "/openp2p/v1/login"
uri := "/api/v1/login"
config := tls.Config{InsecureSkipVerify: true} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert
websocket.DefaultDialer.TLSClientConfig = &config
u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri}
@@ -512,6 +529,7 @@ func (pn *P2PNetwork) init() error {
req.IPv6 = pn.config.publicIPv6
pn.write(MsgReport, MsgReportBasic, &req)
}()
go pn.autorunApp()
gLog.Println(LvDEBUG, "P2PNetwork init ok")
break
}
@@ -543,8 +561,6 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
gLog.Printf(LvERROR, "login error:%d, detail:%s", rsp.Error, rsp.Detail)
pn.running = false
} else {
pn.serverTs = rsp.Ts
pn.hbTime = time.Now()
pn.config.Token = rsp.Token
pn.config.User = rsp.User
gConf.setToken(rsp.Token)
@@ -553,13 +569,28 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
gConf.setNode(rsp.Node)
pn.config.Node = rsp.Node
}
gConf.save()
pn.localTs = time.Now().Unix()
gLog.Printf(LvINFO, "login ok. user=%s,node=%s,Server ts=%d, local ts=%d", rsp.User, rsp.Node, rsp.Ts, pn.localTs)
gLog.Printf(LvINFO, "login ok. user=%s,node=%s", rsp.User, rsp.Node)
}
case MsgHeartbeat:
gLog.Printf(LvDEBUG, "P2PNetwork heartbeat ok")
pn.hbTime = time.Now()
rtt := pn.hbTime.UnixNano() - pn.t1
t2 := int64(binary.LittleEndian.Uint64(msg[openP2PHeaderSize : openP2PHeaderSize+8]))
dt := pn.t1 + rtt/2 - t2
if pn.dtma == 0 {
pn.dtma = dt
} else {
ddt := dt - pn.dt
// if pn.ddt == 0 {
pn.ddt = ddt
// } else {
// pn.ddt = pn.ddt/ddtma*(ddtma-1) + ddt/ddtma // avoid int64 overflow
// }
pn.dtma = pn.dtma/dtma*(dtma-1) + dt/dtma // avoid int64 overflow
}
pn.dt = dt
gLog.Printf(LvDEBUG, "server time dt=%dms ddt=%dns rtt=%dms", pn.dt/int64(time.Millisecond), pn.ddt, rtt/int64(time.Millisecond))
case MsgPush:
handlePush(pn, head.SubType, msg)
default:
@@ -695,6 +726,7 @@ func (pn *P2PNetwork) updateAppHeartbeat(appID uint64) {
})
}
// ipv6 will expired need to refresh.
func (pn *P2PNetwork) refreshIPv6(force bool) {
if !force && !IsIPv6(pn.config.publicIPv6) { // not support ipv6, not refresh
return

View File

@@ -29,14 +29,15 @@ type P2PTunnel struct {
coneLocalPort int
coneNatPort int
linkModeWeb string // use config.linkmode
punchTs uint64
}
func (t *P2PTunnel) requestPeerInfo() error {
// request peer info
t.pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{t.config.peerToken, t.config.PeerNode})
head, body := t.pn.read("", MsgQuery, MsgQueryPeerInfoRsp, time.Second*10)
head, body := t.pn.read("", MsgQuery, MsgQueryPeerInfoRsp, UnderlayConnectTimeout)
if head == nil {
return ErrPeerOffline
return ErrNetwork // network error, should not be ErrPeerOffline
}
rsp := QueryPeerInfoRsp{}
err := json.Unmarshal(body, &rsp)
@@ -74,15 +75,15 @@ func (t *P2PTunnel) initPort() {
t.coneNatPort = t.pn.config.TCPPort // symmetric doesn't need coneNatPort
}
if t.config.linkMode == LinkModeUDPPunch {
// prepare one random cone hole
// prepare one random cone hole manually
_, natPort, _ := natTest(t.pn.config.ServerHost, t.pn.config.UDPPort1, localPort)
t.coneLocalPort = localPort
t.coneNatPort = natPort
}
if t.config.linkMode == LinkModeTCPPunch {
// prepare one random cone hole
_, natPort := natTCP(t.pn.config.ServerHost, IfconfigPort1, localPort)
t.coneLocalPort = localPort
// prepare one random cone hole by system automatically
_, natPort, localPort2 := natTCP(t.pn.config.ServerHost, IfconfigPort1)
t.coneLocalPort = localPort2
t.coneNatPort = natPort
}
t.la = &net.UDPAddr{IP: net.ParseIP(t.pn.config.localIP), Port: t.coneLocalPort}
@@ -112,7 +113,7 @@ func (t *P2PTunnel) connect() error {
req.Token = t.pn.config.Token
}
t.pn.push(t.config.PeerNode, MsgPushConnectReq, req)
head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, time.Second*10)
head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, ClientAPITimeout)
if head == nil {
return errors.New("connect error")
}
@@ -133,6 +134,7 @@ func (t *P2PTunnel) connect() error {
t.config.peerVersion = rsp.Version
t.config.peerConeNatPort = rsp.ConeNatPort
t.config.peerIP = rsp.FromIP
t.punchTs = rsp.PunchTs
err = t.start()
if err != nil {
gLog.Println(LvERROR, "handshake error:", err)
@@ -213,6 +215,13 @@ func (t *P2PTunnel) handshake() error {
return err
}
}
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else {
ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts)
}
gLog.Println(LvDEBUG, "handshake to ", t.config.PeerNode)
var err error
// TODO: handle NATNone, nodes with public ip has no punching
@@ -241,7 +250,7 @@ func (t *P2PTunnel) connectUnderlay() (err error) {
case LinkModeTCP6:
t.conn, err = t.connectUnderlayTCP6()
case LinkModeTCP4:
t.conn, err = t.connectUnderlayTCP()
t.conn, err = t.connectUnderlayTCP() // TODO: can not listen the same tcp port in pararell
case LinkModeTCPPunch:
t.conn, err = t.connectUnderlayTCP()
case LinkModeUDPPunch:
@@ -298,7 +307,7 @@ func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) {
return nil, fmt.Errorf("quic listen error:%s", e)
}
}
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5)
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
gLog.Println(LvDEBUG, "quic dial to ", t.ra.String())
qConn, e = dialQuic(conn, t.ra, TunnelIdleTimeout)
if e != nil {
@@ -327,8 +336,7 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) {
defer gLog.Println(LvINFO, "connectUnderlayTCP end")
var qConn *underlayTCP
if t.config.isUnderlayServer == 1 {
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil)
qConn, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode)
qConn, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode, t)
if err != nil {
return nil, fmt.Errorf("listen TCP error:%s", err)
}
@@ -345,9 +353,20 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) {
return qConn, nil
}
//else
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5)
gLog.Println(LvDEBUG, "TCP dial to ", t.config.peerIP, ":", t.config.peerConeNatPort)
// client side
if t.config.linkMode == LinkModeTCP4 {
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
} else { //tcp punch should sleep for punch the same time
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else {
ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts)
}
}
gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", t.coneLocalPort), "-->", fmt.Sprintf("%s:%d", t.config.peerIP, t.config.peerConeNatPort))
qConn, err = dialTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode)
if err != nil {
return nil, fmt.Errorf("TCP dial to %s:%d error:%s", t.config.peerIP, t.config.peerConeNatPort, err)
@@ -374,7 +393,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) {
var qConn *underlayTCP6
if t.config.isUnderlayServer == 1 {
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil)
qConn, err = listenTCP6(t.coneNatPort, TunnelIdleTimeout)
qConn, err = listenTCP6(t.coneNatPort, HandshakeTimeout)
if err != nil {
return nil, fmt.Errorf("listen TCP6 error:%s", err)
}
@@ -392,7 +411,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) {
}
//else
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5)
t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout)
gLog.Println(LvDEBUG, "TCP6 dial to ", t.config.peerIPv6)
qConn, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort)
if err != nil {
@@ -444,7 +463,7 @@ func (t *P2PTunnel) readLoop() {
continue
}
overlayID := binary.LittleEndian.Uint64(body[:8])
gLog.Printf(LvDEBUG, "%d tunnel read overlay data %d", t.id, overlayID)
gLog.Printf(LvDEBUG, "%d tunnel read overlay data %d bodylen=%d", t.id, overlayID, head.DataLen)
s, ok := t.overlayConns.Load(overlayID)
if !ok {
// debug level, when overlay connection closed, always has some packet not found tunnel
@@ -507,7 +526,7 @@ func (t *P2PTunnel) readLoop() {
}
overlayID := req.ID
gLog.Printf(LvDEBUG, "App:%d overlayID:%d connect %+v", req.AppID, overlayID, req)
gLog.Printf(LvDEBUG, "App:%d overlayID:%d connect %s:%d", req.AppID, overlayID, req.DstIP, req.DstPort)
oConn := overlayConn{
tunnel: t,
id: overlayID,
@@ -515,11 +534,13 @@ func (t *P2PTunnel) readLoop() {
rtid: req.RelayTunnelID,
appID: req.AppID,
appKey: GetKey(req.AppID),
running: true,
}
if req.Protocol == "udp" {
oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort})
} else {
oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5)
oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), HandshakeTimeout)
}
if err != nil {
gLog.Println(LvERROR, err)
@@ -528,7 +549,7 @@ func (t *P2PTunnel) readLoop() {
// calc key bytes for encrypt
if oConn.appKey != 0 {
encryptKey := make([]byte, 16)
encryptKey := make([]byte, AESKeySize)
binary.LittleEndian.PutUint64(encryptKey, oConn.appKey)
binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey)
oConn.appKeyBytes = encryptKey
@@ -548,7 +569,7 @@ func (t *P2PTunnel) readLoop() {
i, ok := t.overlayConns.Load(overlayID)
if ok {
oConn := i.(*overlayConn)
oConn.running = false
oConn.Close()
}
default:
}
@@ -592,8 +613,10 @@ func (t *P2PTunnel) listen() error {
FromIP: t.pn.config.publicIP,
ConeNatPort: t.coneNatPort,
ID: t.id,
PunchTs: uint64(time.Now().UnixNano() + int64(PunchTsDelay) - t.pn.dt),
Version: OpenP2PVersion,
}
t.punchTs = rsp.PunchTs
// only private node set ipv6
if t.config.fromToken == t.pn.config.Token {
t.pn.refreshIPv6(false)
@@ -610,14 +633,7 @@ func (t *P2PTunnel) closeOverlayConns(appID uint64) {
t.overlayConns.Range(func(_, i interface{}) bool {
oConn := i.(*overlayConn)
if oConn.appID == appID {
if oConn.connTCP != nil {
oConn.connTCP.Close()
oConn.connTCP = nil
}
if oConn.connUDP != nil {
oConn.connUDP.Close()
oConn.connUDP = nil
}
oConn.Close()
}
return true
})

View File

@@ -10,9 +10,10 @@ import (
"time"
)
const OpenP2PVersion = "3.6.5"
const OpenP2PVersion = "3.9.1"
const ProductName string = "openp2p"
const LeastSupportVersion = "3.0.0"
const SyncServerTimeVersion = "3.9.0"
const (
IfconfigPort1 = 27180
@@ -96,6 +97,7 @@ const (
MsgPushEditNode = 12
MsgPushAPPKey = 13
MsgPushReportLog = 14
MsgPushDstNodeOnline = 15
)
// MsgP2P sub type message
@@ -137,18 +139,21 @@ const (
TunnelIdleTimeout = time.Minute
SymmetricHandshakeNum = 800 // 0.992379
// SymmetricHandshakeNum = 1000 // 0.999510
SymmetricHandshakeInterval = time.Millisecond
SymmetricHandshakeAckTimeout = time.Second * 11
PeerAddRelayTimeount = time.Second * 20
CheckActiveTimeout = time.Second * 5
PaddingSize = 16
AESKeySize = 16
MaxRetry = 10
RetryInterval = time.Second * 30
PublicIPEchoTimeout = time.Second * 1
NatTestTimeout = time.Second * 5
ClientAPITimeout = time.Second * 10
MaxDirectTry = 3
SymmetricHandshakeInterval = time.Millisecond
HandshakeTimeout = time.Second * 5
PeerAddRelayTimeount = HandshakeTimeout * 2
CheckActiveTimeout = time.Second * 5
PaddingSize = 16
AESKeySize = 16
MaxRetry = 10
RetryInterval = time.Second * 30
PublicIPEchoTimeout = time.Second * 1
NatTestTimeout = time.Second * 5
UDPReadTimeout = time.Second * 5
ClientAPITimeout = time.Second * 10
UnderlayConnectTimeout = time.Second * 10
MaxDirectTry = 3
PunchTsDelay = time.Second * 2
)
// NATNone has public ip
@@ -223,6 +228,9 @@ type PushConnectReq struct {
LinkMode string `json:"linkMode,omitempty"`
IsUnderlayServer int `json:"isServer,omitempty"` // Requset spec peer is server
}
type PushDstNodeOnline struct {
Node string `json:"node,omitempty"`
}
type PushConnectRsp struct {
Error int `json:"error,omitempty"`
From string `json:"from,omitempty"`
@@ -235,6 +243,7 @@ type PushConnectRsp struct {
ConeNatPort int `json:"coneNatPort,omitempty"` //it's not only cone, but also upnp or nat-pmp hole
FromIP string `json:"fromIP,omitempty"`
ID uint64 `json:"id,omitempty"`
PunchTs uint64 `json:"punchts,omitempty"` // server timestamp
Version string `json:"version,omitempty"`
}
type PushRsp struct {

View File

@@ -1,35 +0,0 @@
// Time-based One-time Password
package openp2p
import (
"crypto/hmac"
"crypto/sha256"
"encoding/binary"
)
const TOTPStep = 30 // 30s
func GenTOTP(token uint64, ts int64) uint64 {
step := ts / TOTPStep
tbuff := make([]byte, 8)
binary.LittleEndian.PutUint64(tbuff, token)
mac := hmac.New(sha256.New, tbuff)
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(step))
mac.Write(b)
num := binary.LittleEndian.Uint64(mac.Sum(nil)[:8])
// fmt.Printf("%x\n", mac.Sum(nil))
return num
}
func VerifyTOTP(code uint64, token uint64, ts int64) bool {
if code == 0 {
return false
}
if code == token {
return true
}
if code == GenTOTP(token, ts) || code == GenTOTP(token, ts-TOTPStep) || code == GenTOTP(token, ts+TOTPStep) {
return true
}
return false
}

View File

@@ -1,36 +0,0 @@
// Time-based One-time Password
package openp2p
import (
"testing"
"time"
)
func TestTOTP(t *testing.T) {
for i := 0; i < 20; i++ {
ts := time.Now().Unix()
code := GenTOTP(13666999958022769123, ts)
t.Log(code)
if !VerifyTOTP(code, 13666999958022769123, ts) {
t.Error("TOTP error")
}
if !VerifyTOTP(code, 13666999958022769123, ts-10) {
t.Error("TOTP error")
}
if !VerifyTOTP(code, 13666999958022769123, ts+10) {
t.Error("TOTP error")
}
if VerifyTOTP(code, 13666999958022769123, ts+60) {
t.Error("TOTP error")
}
if VerifyTOTP(code, 13666999958022769124, ts+1) {
t.Error("TOTP error")
}
if VerifyTOTP(code, 13666999958022769125, ts+1) {
t.Error("TOTP error")
}
time.Sleep(time.Second)
t.Log("round", i, " ", ts, " test ok")
}
}

View File

@@ -18,9 +18,9 @@ func UDPWrite(conn *net.UDPConn, dst net.Addr, mainType uint16, subType uint16,
return conn.WriteTo(msg, dst)
}
func UDPRead(conn *net.UDPConn, timeout int) (ra net.Addr, head *openP2PHeader, result []byte, len int, err error) {
func UDPRead(conn *net.UDPConn, timeout time.Duration) (ra net.Addr, head *openP2PHeader, result []byte, len int, err error) {
if timeout > 0 {
deadline := time.Now().Add(time.Millisecond * time.Duration(timeout))
deadline := time.Now().Add(timeout)
err = conn.SetReadDeadline(deadline)
if err != nil {
gLog.Println(LvERROR, "SetReadDeadline error")

View File

@@ -15,10 +15,10 @@ import (
"sync"
"time"
"github.com/lucas-clemente/quic-go"
"github.com/quic-go/quic-go"
)
//quic.DialContext do not support version 44,disable it
// quic.DialContext do not support version 44,disable it
var quicVersion []quic.VersionNumber
type underlayQUIC struct {
@@ -87,7 +87,7 @@ func (conn *underlayQUIC) CloseListener() {
}
func (conn *underlayQUIC) Accept() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), UnderlayConnectTimeout)
defer cancel()
sess, err := conn.listener.Accept(ctx)
if err != nil {

View File

@@ -67,21 +67,30 @@ func (conn *underlayTCP) Close() error {
return conn.Conn.Close()
}
func listenTCP(host string, port int, localPort int, mode string) (*underlayTCP, error) {
func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) (*underlayTCP, error) {
if mode == LinkModeTCPPunch {
c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout) // TODO: timeout
if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS {
gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion)
} else {
ts := time.Duration(int64(t.punchTs) + t.pn.dt - time.Now().UnixNano())
gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond)
time.Sleep(ts)
}
gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port))
c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
if err != nil {
gLog.Println(LvDEBUG, "send tcp punch: ", err)
return nil, err
}
return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil
}
t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil)
addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", localPort))
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}
l.SetDeadline(time.Now().Add(SymmetricHandshakeAckTimeout))
l.SetDeadline(time.Now().Add(HandshakeTimeout))
c, err := l.Accept()
defer l.Close()
if err != nil {
@@ -94,9 +103,9 @@ func dialTCP(host string, port int, localPort int, mode string) (*underlayTCP, e
var c net.Conn
var err error
if mode == LinkModeTCPPunch {
c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout)
c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
} else {
c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout)
c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), HandshakeTimeout)
}
if err != nil {

View File

@@ -73,7 +73,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) {
return nil, err
}
defer l.Close()
l.SetDeadline(time.Now().Add(SymmetricHandshakeAckTimeout))
l.SetDeadline(time.Now().Add(HandshakeTimeout))
c, err := l.Accept()
defer l.Close()
if err != nil {
@@ -83,7 +83,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) {
}
func dialTCP6(host string, port int) (*underlayTCP6, error) {
c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), SymmetricHandshakeAckTimeout)
c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), HandshakeTimeout)
if err != nil {
gLog.Printf(LvERROR, "Dial %s:%d error:%s", host, port, err)
return nil, err

View File

@@ -27,7 +27,7 @@ func update(host string, port int) {
}
goos := runtime.GOOS
goarch := runtime.GOARCH
rsp, err := c.Get(fmt.Sprintf("https://%s:%d/api/v1/update?fromver=%s&os=%s&arch=%s", host, port, OpenP2PVersion, goos, goarch))
rsp, err := c.Get(fmt.Sprintf("https://%s:%d/api/v1/update?fromver=%s&os=%s&arch=%s&user=%s&node=%s", host, port, OpenP2PVersion, goos, goarch, gConf.Network.User, gConf.Network.Node))
if err != nil {
gLog.Println(LvERROR, "update:query update list failed:", err)
return

View File

@@ -5,6 +5,7 @@ package openp2p
import (
"bytes"
"crypto/tls"
"encoding/xml"
"errors"
"fmt"
@@ -181,7 +182,12 @@ func localIPv4() string { // TODO: multi nic will wrong
}
func getServiceURL(rootURL string) (url, urnDomain string, err error) {
r, err := http.Get(rootURL)
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Timeout: time.Second * 3}
r, err := client.Get(rootURL)
if err != nil {
return
}

69
core/util_freebsd.go Normal file
View File

@@ -0,0 +1,69 @@
package openp2p
import (
"bufio"
"bytes"
"io/ioutil"
"os"
"runtime"
"strings"
"syscall"
)
const (
defaultInstallPath = "/usr/local/openp2p"
defaultBinName = "openp2p"
)
func getOsName() (osName string) {
var sysnamePath string
sysnamePath = "/etc/redhat-release"
_, err := os.Stat(sysnamePath)
if err != nil && os.IsNotExist(err) {
str := "PRETTY_NAME="
f, err := os.Open("/etc/os-release")
if err == nil {
buf := bufio.NewReader(f)
for {
line, err := buf.ReadString('\n')
if err == nil {
line = strings.TrimSpace(line)
pos := strings.Count(line, str)
if pos > 0 {
len1 := len([]rune(str)) + 1
rs := []rune(line)
osName = string(rs[len1 : (len(rs))-1])
break
}
} else {
break
}
}
}
} else {
buff, err := ioutil.ReadFile(sysnamePath)
if err == nil {
osName = string(bytes.TrimSpace(buff))
}
}
if osName == "" {
osName = "FreeBSD"
}
return
}
func setRLimit() error {
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
limit.Max = 65536
limit.Cur = limit.Max
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
return nil
}
func setFirewall() {
}

12
docker/Dockerfile Executable file
View File

@@ -0,0 +1,12 @@
FROM alpine:3.18.2
# Replace the default Alpine repositories with Aliyun mirrors
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories && \
apk add --no-cache ca-certificates && \
rm -rf /tmp/* /var/tmp/* /var/cache/apk/* /var/cache/distfiles/*
COPY get-client.sh /
RUN echo $TARGETPLATFORM && chmod +x /get-client.sh && ./get-client.sh
ENTRYPOINT ["/openp2p"]

43
docker/get-client.sh Executable file
View File

@@ -0,0 +1,43 @@
#!/bin/sh
echo "Running on platform: $TARGETPLATFORM"
# TARGETPLATFORM=$(echo $TARGETPLATFORM | tr ',' '/')
echo "Running on platform: $TARGETPLATFORM"
sysType="linux-amd64"
archType=$(uname -m)
if [[ $archType == aarch64 ]] ;
then
sysType="linux-arm64"
elif [[ $archType == arm* ]] ;
then
sysType="linux-arm"
elif [[ $archType == i*86 ]] ;
then
sysType="linux-386"
elif [[ $archType == mips ]] ;
then
sysType="linux-mipsle"
ls /lib |grep mipsel
if [[ $? -ne 0 ]]; then
# mipsel not found, it's mipseb
sysType="linux-mipsbe"
fi
fi
url="https://openp2p.cn/download/v1/latest/openp2p-latest.$sysType.tar.gz"
echo "download $url start"
if [ -f /usr/bin/curl ]; then
curl -k -o openp2p.tar.gz $url
else
wget --no-check-certificate -O openp2p.tar.gz $url
fi
if [ $? -ne 0 ]; then
echo "download error $?"
exit 9
fi
echo "download ok"
tar -xzvf openp2p.tar.gz
chmod +x openp2p
pwd
ls -l
exit 0

26
go.mod
View File

@@ -4,27 +4,25 @@ go 1.18
require (
github.com/gorilla/websocket v1.4.2
github.com/lucas-clemente/quic-go v0.27.0
github.com/openp2p-cn/go-reuseport v0.3.2
github.com/openp2p-cn/service v1.0.0
github.com/openp2p-cn/totp v0.0.0-20230102121327-8e02f6b392ed
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f
github.com/quic-go/quic-go v0.34.0
golang.org/x/sys v0.5.0
)
require (
github.com/cheekybits/genny v1.0.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/kardianos/service v1.2.2 // indirect
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.1 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/tools v0.1.12 // indirect
github.com/onsi/ginkgo/v2 v2.2.0 // indirect
github.com/quic-go/qtls-go1-19 v0.3.2 // indirect
github.com/quic-go/qtls-go1-20 v0.2.2 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/tools v0.2.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
)