Compare commits

..

5 Commits

Author SHA1 Message Date
TenderIronh
cd415e7bf4 3.6.11 2023-03-25 12:00:27 +08:00
TenderIronh
67e3a8915a 3.6.8 2023-03-22 23:11:38 +08:00
TenderIronh
791d910314 3.6.5 2023-03-05 00:44:22 +08:00
TenderIronh
c3a43be3cc improve gatway and p2papp reconnect 2022-11-25 23:55:33 +08:00
TenderIronh
c8b8bf05a5 support openwrt and improve app and gateway reconnect time 2022-11-18 23:19:47 +08:00
23 changed files with 337 additions and 212 deletions

View File

@@ -101,6 +101,22 @@ cd到代码根目录执行
```
make
```
手动编译特定系统和架构
All GOOS values:
```
"aix", "android", "darwin", "dragonfly", "freebsd", "hurd", "illumos", "ios", "js", "linux", "nacl", "netbsd", "openbsd", "plan9", "solaris", "windows", "zos"
```
All GOARCH values:
```
"386", "amd64", "amd64p32", "arm", "arm64", "arm64be", "armbe", "loong64", "mips", "mips64", "mips64le", "mips64p32", "mips64p32le", "mipsle", "ppc", "ppc64", "ppc64le", "riscv", "riscv64", "s390", "s390x", "sparc", "sparc64", "wasm"
```
比如linux+amd64
```
export GOPROXY=https://goproxy.io,direct
go mod tidy
CGO_ENABLED=0 env GOOS=linux GOARCH=amd64 go build -o openp2p --ldflags '-s -w ' -gcflags '-l' -p 8 -installsuffix cgo ./cmd
```
## RoadMap
近期计划:
@@ -110,12 +126,12 @@ make
4. ~~建立网站用户可以在网站管理所有P2PApp和设备。查看设备在线状态升级增删查改重启P2PApp等~~(100%)
5. 建立公众号用户可在微信公众号管理所有P2PApp和设备
6. 客户端提供WebUI
7. 支持自有服务器,开源服务器程序
7. ~~支持自有服务器,开源服务器程序~~(100%)
8. 共享节点调度模型优化,对不同的运营商优化
9. 方便二次开发提供API和lib
10. 应用层支持UDP协议实现很简单但UDP应用较少暂不急(100%)
10. ~~应用层支持UDP协议实现很简单但UDP应用较少暂不急~~(100%)
11. 底层通信支持KCP协议目前仅支持QuicKCP专门对延时优化被游戏加速器广泛使用可以牺牲一定的带宽降低延时
12. 支持Android系统让旧手机焕发青春变成移动网关
12. ~~支持Android系统让旧手机焕发青春变成移动网关~~(100%)
13. 支持Windows网上邻居共享文件
14. 内网直连优化,用处不大,估计就用户测试时用到
15. ~~支持UPNP~~(100%)

View File

@@ -109,6 +109,23 @@ cd root directory of the socure code and execute
make
```
build specified os and arch.
All GOOS values:
```
"aix", "android", "darwin", "dragonfly", "freebsd", "hurd", "illumos", "ios", "js", "linux", "nacl", "netbsd", "openbsd", "plan9", "solaris", "windows", "zos"
```
All GOARCH values:
```
"386", "amd64", "amd64p32", "arm", "arm64", "arm64be", "armbe", "loong64", "mips", "mips64", "mips64le", "mips64p32", "mips64p32le", "mipsle", "ppc", "ppc64", "ppc64le", "riscv", "riscv64", "s390", "s390x", "sparc", "sparc64", "wasm"
```
For example linux+amd64
```
export GOPROXY=https://goproxy.io,direct
go mod tidy
CGO_ENABLED=0 env GOOS=linux GOARCH=amd64 go build -o openp2p --ldflags '-s -w ' -gcflags '-l' -p 8 -installsuffix cgo ./cmd
```
## RoadMap
Short-Term:
1. ~~Support IPv6.~~(100%)
@@ -117,12 +134,12 @@ Short-Term:
4. ~~Build website, users can manage all P2PApp and devices via it. View devices' online status, upgrade, restart or CURD P2PApp .~~(100%)
5. Provide wechat official account, user can manage P2PApp nodes and deivce as same as website.
6. Provide WebUI on client side.
7. Support private server, open source server program.
7. ~~Support private server, open source server program.~~(100%)
8. Optimize our share scheduling model for different network operators.
9. Provide REST APIs and libary for secondary development.
10. ~~Support UDP at application layer, it is easy to implement but not urgent due to only a few applicaitons using UDP protocol.~~(100%)
11. Support KCP protocol underlay, currently support Quic only. KCP focus on delay optimization,which has been widely used as game accelerator,it can sacrifice part of bandwidth to reduce timelag.
12. Support Android platform, let the phones to be mobile gateway.
12. ~~Support Android platform, let the phones to be mobile gateway.~~(100%)
13. Support SMB Windows neighborhood.
14. Direct connection on intranet, for testing.
15. ~~Support UPNP.~~(100%)

17
app/README.md Normal file
View File

@@ -0,0 +1,17 @@
## Build
```
cd core
go get -v golang.org/x/mobile/bind
gomobile bind -target android -v
if [[ $? -ne 0 ]]; then
echo "build error"
exit 9
fi
echo "build ok"
cp openp2p.aar openp2p-sources.jar ../app/app/libs
echo "copy to APP libs"
cd ../app
./gradlew build
```

View File

@@ -20,7 +20,6 @@ import (
const MinNodeNameLen = 8
func getmac(ip string) string {
//get mac relative to the ip address which connected to the mq.
ifaces, err := net.Interfaces()
if err != nil {
return ""

View File

@@ -62,6 +62,16 @@ func (c *Config) switchApp(app AppConfig, enabled int) {
}
}
}
func (c *Config) retryApp(peerNode string) {
c.mtx.Lock()
defer c.mtx.Unlock()
for i := 0; i < len(c.Apps); i++ {
if c.Apps[i].PeerNode == peerNode {
c.Apps[i].retryNum = 0
c.Apps[i].nextRetryTime = time.Now()
}
}
}
func (c *Config) add(app AppConfig, override bool) {
c.mtx.Lock()
@@ -128,10 +138,13 @@ func (c *Config) load() error {
return err
}
// TODO: deal with multi-thread r/w
func (c *Config) setToken(token uint64) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.Network.Token = token
if token != 0 {
c.Network.Token = token
}
}
func (c *Config) setUser(user string) {
c.mtx.Lock()
@@ -229,7 +242,7 @@ func parseParams(subCommand string) {
gConf.Network.TCPPort = *tcpPort
}
if f.Name == "token" {
gConf.Network.Token = *token
gConf.setToken(*token)
}
})

View File

@@ -6,7 +6,7 @@ import (
"path/filepath"
"time"
"github.com/kardianos/service"
"github.com/openp2p-cn/service"
)
type daemon struct {
@@ -44,9 +44,9 @@ func (d *daemon) run() {
}
gLog.Println(LvINFO, mydir)
conf := &service.Config{
Name: ProducnName,
DisplayName: ProducnName,
Description: ProducnName,
Name: ProductName,
DisplayName: ProductName,
Description: ProductName,
Executable: binPath,
}
@@ -95,9 +95,9 @@ func (d *daemon) run() {
func (d *daemon) Control(ctrlComm string, exeAbsPath string, args []string) error {
svcConfig := &service.Config{
Name: ProducnName,
DisplayName: ProducnName,
Description: ProducnName,
Name: ProductName,
DisplayName: ProductName,
Description: ProductName,
Executable: exeAbsPath,
Arguments: args,
}

View File

@@ -8,12 +8,13 @@ import (
var (
// ErrorS2S string = "s2s is not supported"
// ErrorHandshake string = "handshake error"
ErrorS2S = errors.New("s2s is not supported")
ErrorHandshake = errors.New("handshake error")
ErrorNewUser = errors.New("new user")
ErrorLogin = errors.New("user or password not correct")
ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters")
ErrPeerOffline = errors.New("peer offline")
ErrMsgFormat = errors.New("message format wrong")
ErrVersionNotCompatible = errors.New("version not compatible")
ErrorS2S = errors.New("s2s is not supported")
ErrorHandshake = errors.New("handshake error")
ErrorNewUser = errors.New("new user")
ErrorLogin = errors.New("user or password not correct")
ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters")
ErrPeerOffline = errors.New("peer offline")
ErrMsgFormat = errors.New("message format wrong")
ErrVersionNotCompatible = errors.New("version not compatible")
ErrOverlayConnDisconnect = errors.New("overlay connection is disconnected")
)

View File

@@ -9,6 +9,8 @@ import (
"os/exec"
"path/filepath"
"time"
"github.com/openp2p-cn/totp"
)
func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
@@ -26,7 +28,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
gLog.Printf(LvERROR, "wrong MsgPushConnectReq:%s", err)
return err
}
gLog.Printf(LvINFO, "%s is connecting...", req.From)
gLog.Printf(LvDEBUG, "%s is connecting...", req.From)
gLog.Println(LvDEBUG, "push connect response to ", req.From)
if compareVersion(req.Version, LeastSupportVersion) == LESS {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
@@ -40,8 +42,9 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
return ErrVersionNotCompatible
}
// verify totp token or token
if VerifyTOTP(req.Token, pn.config.Token, time.Now().Unix()+(pn.serverTs-pn.localTs)) || // localTs may behind, auto adjust ts
VerifyTOTP(req.Token, pn.config.Token, time.Now().Unix()) {
t := totp.TOTP{Step: totp.RelayTOTPStep}
if t.Verify(req.Token, pn.config.Token, time.Now().Unix()+(pn.serverTs-pn.localTs)) || // localTs may behind, auto adjust ts
t.Verify(req.Token, pn.config.Token, time.Now().Unix()) {
gLog.Printf(LvINFO, "Access Granted\n")
config := AppConfig{}
config.peerNatType = req.NatType
@@ -101,7 +104,6 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
msg := TunnelMsg{ID: t.id}
pn.push(r.From, MsgPushAddRelayTunnelRsp, msg)
}
}(req)
case MsgPushAPPKey:
req := APPKeySync{}
@@ -174,7 +176,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
}
pn.write(MsgReport, MsgReportApps, &req)
case MsgPushReportLog:
gLog.Println(LvINFO, "MsgPushReportLog")
gLog.Println(LvDEBUG, "MsgPushReportLog")
req := ReportLogReq{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &req)
if err != nil {
@@ -272,6 +274,16 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
// disable APP
pn.DeleteApp(config)
}
case MsgPushDstNodeOnline:
gLog.Println(LvINFO, "MsgPushDstNodeOnline")
app := PushDstNodeOnline{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &app)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushDstNodeOnline:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
gLog.Println(LvINFO, "retry peerNode ", app.Node)
gConf.retryApp(app.Node)
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[pushHead.From]

View File

@@ -67,11 +67,11 @@ func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64,
os.MkdirAll(logdir, 0777)
for lv := range logFileNames {
logFilePath := logdir + filePrefix + logFileNames[lv]
f, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
f, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Fatal(err)
}
os.Chmod(logFilePath, 0666)
os.Chmod(logFilePath, 0644)
logfiles[lv] = f
loggers[lv] = log.New(f, "", log.LstdFlags)
}
@@ -119,7 +119,7 @@ func (l *logger) checkFile() {
backupPath := l.logDir + fname + ".0"
os.Remove(backupPath)
os.Rename(l.logDir+fname, backupPath)
newFile, e := os.OpenFile(l.logDir+fname, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
newFile, e := os.OpenFile(l.logDir+fname, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if e == nil {
l.loggers[lv].SetOutput(newFile)
l.files[lv] = newFile

View File

@@ -91,7 +91,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string,
func getNATType(host string, udp1 int, udp2 int) (publicIP string, NATType int, hasIPvr int, hasUPNPorNATPMP int, err error) {
// the random local port may be used by other.
localPort := int(rand.Uint32()%15000 + 50000)
echoPort := P2PNetworkInstance(nil).config.TCPPort
echoPort := gConf.Network.TCPPort
ip1, port1, err := natTest(host, udp1, localPort)
if err != nil {
return "", 0, 0, 0, err

View File

@@ -13,7 +13,7 @@ func Run() {
rand.Seed(time.Now().UnixNano())
baseDir := filepath.Dir(os.Args[0])
os.Chdir(baseDir) // for system service
gLog = NewLogger(baseDir, ProducnName, LvDEBUG, 1024*1024, LogFileAndConsole)
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFileAndConsole)
// TODO: install sub command, deamon process
if len(os.Args) > 1 {
switch os.Args[1] {
@@ -21,7 +21,7 @@ func Run() {
fmt.Println(OpenP2PVersion)
return
case "update":
gLog = NewLogger(baseDir, ProducnName, LvDEBUG, 1024*1024, LogFileAndConsole)
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFileAndConsole)
targetPath := filepath.Join(defaultInstallPath, defaultBinName)
d := daemon{}
err := d.Control("restart", targetPath, nil)
@@ -53,6 +53,10 @@ func Run() {
gLog.Println(LvINFO, &gConf)
setFirewall()
err := setRLimit()
if err != nil {
gLog.Println(LvINFO, "setRLimit error:", err)
}
network := P2PNetworkInstance(&gConf.Network)
if ok := network.Connect(30000); !ok {
gLog.Println(LvERROR, "P2PNetwork login error")
@@ -70,7 +74,7 @@ var network *P2PNetwork
func RunAsModule(baseDir string, token string, bw int, logLevel int) *P2PNetwork {
rand.Seed(time.Now().UnixNano())
os.Chdir(baseDir) // for system service
gLog = NewLogger(baseDir, ProducnName, LvDEBUG, 1024*1024, LogFileAndConsole)
gLog = NewLogger(baseDir, ProductName, LvDEBUG, 1024*1024, LogFileAndConsole)
parseParams("")

View File

@@ -35,24 +35,23 @@ type overlayConn struct {
// for udp
connUDP *net.UDPConn
remoteAddr net.Addr
udpRelayData chan []byte
udpData chan []byte
lastReadUDPTs time.Time
}
func (oConn *overlayConn) run() {
gLog.Printf(LvDEBUG, "%d overlayConn run start", oConn.id)
defer gLog.Printf(LvDEBUG, "%d overlayConn run end", oConn.id)
oConn.running = true
oConn.lastReadUDPTs = time.Now()
buffer := make([]byte, ReadBuffLen+PaddingSize)
readBuf := buffer[:ReadBuffLen]
buffer := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
reuseBuff := buffer[:ReadBuffLen]
encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
tunnelHead := new(bytes.Buffer)
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, oConn.rtid)
binary.Write(tunnelHead, binary.LittleEndian, oConn.id)
for oConn.running && oConn.tunnel.isRuning() {
buff, dataLen, err := oConn.Read(readBuf)
readBuff, dataLen, err := oConn.Read(reuseBuff)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
continue
@@ -61,9 +60,9 @@ func (oConn *overlayConn) run() {
gLog.Printf(LvDEBUG, "overlayConn %d read error:%s,close it", oConn.id, err)
break
}
payload := buff[:dataLen]
payload := readBuff[:dataLen]
if oConn.appKey != 0 {
payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, buffer[:dataLen], dataLen)
payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, readBuff[:dataLen], dataLen)
}
writeBytes := append(tunnelHead.Bytes(), payload...)
if oConn.rtid == 0 {
@@ -98,7 +97,11 @@ func (oConn *overlayConn) run() {
}
}
func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error) {
func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, dataLen int, err error) {
if !oConn.running {
err = ErrOverlayConnDisconnect
return
}
if oConn.connUDP != nil {
if time.Now().After(oConn.lastReadUDPTs.Add(time.Minute * 5)) {
err = errors.New("udp close")
@@ -106,15 +109,15 @@ func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error)
}
if oConn.remoteAddr != nil { // as server
select {
case buff = <-oConn.udpRelayData:
n = len(buff)
case buff = <-oConn.udpData:
dataLen = len(buff) - PaddingSize
oConn.lastReadUDPTs = time.Now()
case <-time.After(time.Second * 10):
err = ErrDeadlineExceeded
}
} else { // as client
oConn.connUDP.SetReadDeadline(time.Now().Add(5 * time.Second))
n, _, err = oConn.connUDP.ReadFrom(reuseBuff)
dataLen, _, err = oConn.connUDP.ReadFrom(reuseBuff)
if err == nil {
oConn.lastReadUDPTs = time.Now()
}
@@ -122,15 +125,21 @@ func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error)
}
return
}
oConn.connTCP.SetReadDeadline(time.Now().Add(time.Second * 5))
n, err = oConn.connTCP.Read(reuseBuff)
buff = reuseBuff
if oConn.connTCP != nil {
oConn.connTCP.SetReadDeadline(time.Now().Add(time.Second * 5))
dataLen, err = oConn.connTCP.Read(reuseBuff)
buff = reuseBuff
}
return
}
// calling by p2pTunnel
func (oConn *overlayConn) Write(buff []byte) (n int, err error) {
// add mutex when multi-thread calling
if !oConn.running {
return 0, ErrOverlayConnDisconnect
}
if oConn.connUDP != nil {
if oConn.remoteAddr == nil {
n, err = oConn.connUDP.Write(buff)
@@ -142,9 +151,25 @@ func (oConn *overlayConn) Write(buff []byte) (n int, err error) {
}
return
}
n, err = oConn.connTCP.Write(buff)
if oConn.connTCP != nil {
n, err = oConn.connTCP.Write(buff)
}
if err != nil {
oConn.running = false
}
return
}
func (oConn *overlayConn) Close() (err error) {
oConn.running = false
if oConn.connTCP != nil {
oConn.connTCP.Close()
oConn.connTCP = nil
}
if oConn.connUDP != nil {
oConn.connUDP.Close()
oConn.connUDP = nil
}
return nil
}

View File

@@ -51,7 +51,7 @@ func (app *p2pApp) listenTCP() error {
gLog.Printf(LvDEBUG, "tcp accept on port %d start", app.config.SrcPort)
defer gLog.Printf(LvDEBUG, "tcp accept on port %d end", app.config.SrcPort)
var err error
app.listener, err = net.Listen("tcp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort))
app.listener, err = net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort)) // support tcp4 and tcp6
if err != nil {
gLog.Printf(LvERROR, "listen error:%s", err)
return err
@@ -72,6 +72,7 @@ func (app *p2pApp) listenTCP() error {
rtid: app.rtid,
appID: app.id,
appKey: app.key,
running: true,
}
// pre-calc key bytes for encrypt
if oConn.appKey != 0 {
@@ -100,6 +101,8 @@ func (app *p2pApp) listenTCP() error {
msgWithHead := append(relayHead.Bytes(), msg...)
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
// TODO: wait OverlayConnectRsp instead of sleep
time.Sleep(time.Second) // waiting remote node connection ok
go oConn.run()
}
return nil
@@ -114,7 +117,7 @@ func (app *p2pApp) listenUDP() error {
gLog.Printf(LvERROR, "listen error:%s", err)
return err
}
buffer := make([]byte, 64*1024)
buffer := make([]byte, 64*1024+PaddingSize)
udpID := make([]byte, 8)
for {
app.listenerUDP.SetReadDeadline(time.Now().Add(time.Second * 10))
@@ -127,8 +130,8 @@ func (app *p2pApp) listenUDP() error {
break
}
} else {
b := bytes.Buffer{}
b.Write(buffer[:len])
dupData := bytes.Buffer{} // should uses memory pool
dupData.Write(buffer[:len+PaddingSize])
// load from app.tunnel.overlayConns by remoteAddr error, new udp connection
remoteIP := strings.Split(remoteAddr.String(), ":")[0]
port, _ := strconv.Atoi(strings.Split(remoteAddr.String(), ":")[1])
@@ -139,19 +142,20 @@ func (app *p2pApp) listenUDP() error {
udpID[3] = a[3]
udpID[4] = byte(port)
udpID[5] = byte(port >> 8)
id := binary.LittleEndian.Uint64(udpID)
id := binary.LittleEndian.Uint64(udpID) // convert remoteIP:port to uint64
s, ok := app.tunnel.overlayConns.Load(id)
if !ok {
oConn := overlayConn{
tunnel: app.tunnel,
connUDP: app.listenerUDP,
remoteAddr: remoteAddr,
udpRelayData: make(chan []byte, 1000),
id: id,
isClient: true,
rtid: app.rtid,
appID: app.id,
appKey: app.key,
tunnel: app.tunnel,
connUDP: app.listenerUDP,
remoteAddr: remoteAddr,
udpData: make(chan []byte, 1000),
id: id,
isClient: true,
rtid: app.rtid,
appID: app.id,
appKey: app.key,
running: true,
}
// calc key bytes for encrypt
if oConn.appKey != 0 {
@@ -180,8 +184,10 @@ func (app *p2pApp) listenUDP() error {
msgWithHead := append(relayHead.Bytes(), msg...)
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
// TODO: wait OverlayConnectRsp instead of sleep
time.Sleep(time.Second) // waiting remote node connection ok
go oConn.run()
oConn.udpRelayData <- b.Bytes()
oConn.udpData <- dupData.Bytes()
}
// load from app.tunnel.overlayConns by remoteAddr ok, write relay data
@@ -189,7 +195,7 @@ func (app *p2pApp) listenUDP() error {
if !ok {
continue
}
overlayConn.udpRelayData <- b.Bytes()
overlayConn.udpData <- dupData.Bytes()
}
}
return nil
@@ -204,12 +210,15 @@ func (app *p2pApp) listen() error {
if app.rtid != 0 {
go app.relayHeartbeatLoop()
}
for app.tunnel.isRuning() && app.running {
for app.tunnel.isRuning() {
if app.config.Protocol == "udp" {
app.listenUDP()
} else {
app.listenTCP()
}
if !app.running {
break
}
time.Sleep(time.Second * 10)
}
return nil

View File

@@ -7,7 +7,6 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/rand"
"net/http"
"net/url"
@@ -23,16 +22,21 @@ var (
once sync.Once
)
const (
retryLimit = 20
retryInterval = 10 * time.Second
)
type P2PNetwork struct {
conn *websocket.Conn
online bool
running bool
restartCh chan bool
wg sync.WaitGroup
writeMtx sync.Mutex
serverTs int64
localTs int64
hbTime time.Time
conn *websocket.Conn
online bool
running bool
restartCh chan bool
wgReconnect sync.WaitGroup
writeMtx sync.Mutex
serverTs int64
localTs int64
hbTime time.Time
// msgMap sync.Map
msgMap map[uint64]chan []byte //key: nodeID
msgMapMtx sync.Mutex
@@ -64,7 +68,6 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
}
func (pn *P2PNetwork) run() {
go pn.autorunApp()
heartbeatTimer := time.NewTicker(NetworkHeartbeatTime)
for pn.running {
select {
@@ -73,8 +76,8 @@ func (pn *P2PNetwork) run() {
case <-pn.restartCh:
pn.online = false
pn.wg.Wait() // wait read/write goroutine exited
time.Sleep(NetworkHeartbeatTime)
pn.wgReconnect.Wait() // wait read/autorunapp goroutine end
time.Sleep(ClientAPITimeout)
err := pn.init()
if err != nil {
gLog.Println(LvERROR, "P2PNetwork init error:", err)
@@ -121,41 +124,40 @@ func (pn *P2PNetwork) runAll() {
if appExist {
pn.DeleteApp(*config)
}
if config.retryNum > 0 {
if config.retryNum >= retryLimit {
continue
}
if config.retryNum > 0 { // first time not show reconnect log
gLog.Printf(LvINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum)
if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // normal lasts 15min
if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // run normally 15min, reset retrynum
config.retryNum = 0
}
}
config.retryNum++
config.retryTime = time.Now()
increase := math.Pow(1.5, float64(config.retryNum)) // exponential increase retry time. 1.5^x
if increase > 900 {
increase = 900
config.Enabled = 0
gLog.Printf(LvWARN, "app %s has stopped retry, manually enable it on Web console", config.AppName)
continue
}
config.nextRetryTime = time.Now().Add(time.Second * time.Duration(increase))
config.nextRetryTime = time.Now().Add(retryInterval)
config.connectTime = time.Now()
config.peerToken = pn.config.Token
gConf.mtx.Unlock() // AddApp will take a period of time
gConf.mtx.Unlock() // AddApp will take a period of time, let outside modify gConf
err := pn.AddApp(*config)
gConf.mtx.Lock()
if err != nil {
config.errMsg = err.Error()
if err == ErrPeerOffline { // stop retry, waiting for online
config.retryNum = retryLimit
gLog.Printf(LvINFO, " %s offline, it will auto reconnect when peer node online", config.PeerNode)
}
}
}
}
func (pn *P2PNetwork) autorunApp() {
gLog.Println(LvINFO, "autorunApp start")
for pn.running {
pn.wgReconnect.Add(1)
defer pn.wgReconnect.Done()
for pn.running && pn.online {
time.Sleep(time.Second)
if !pn.online {
continue
}
pn.runAll()
time.Sleep(time.Second * 10)
}
gLog.Println(LvINFO, "autorunApp end")
}
@@ -369,9 +371,10 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
initErr := t.requestPeerInfo()
if initErr != nil {
gLog.Println(LvERROR, "init error:", initErr)
return nil, initErr
}
err := ErrorHandshake
var err error
// try TCP6
if IsIPv6(t.config.peerIPv6) && IsIPv6(t.pn.config.publicIPv6) {
gLog.Println(LvINFO, "try TCP6")
@@ -417,7 +420,7 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel,
return t, nil
}
}
return nil, err
return nil, ErrorHandshake // only ErrorHandshake will try relay
}
func (pn *P2PNetwork) newTunnel(t *P2PTunnel, tid uint64, isClient bool) error {
@@ -440,6 +443,8 @@ func (pn *P2PNetwork) newTunnel(t *P2PTunnel, tid uint64, isClient bool) error {
}
func (pn *P2PNetwork) init() error {
gLog.Println(LvINFO, "init start")
pn.wgReconnect.Add(1)
defer pn.wgReconnect.Done()
var err error
for {
// detect nat type
@@ -449,12 +454,14 @@ func (pn *P2PNetwork) init() error {
pn.config.natType = NATSymmetric
pn.config.hasIPv4 = 0
pn.config.hasUPNPorNATPMP = 0
gLog.Println(LvINFO, "openp2pS2STest debug")
}
if strings.Contains(pn.config.Node, "openp2pC2CTest") {
pn.config.natType = NATCone
pn.config.hasIPv4 = 0
pn.config.hasUPNPorNATPMP = 0
gLog.Println(LvINFO, "openp2pC2CTest debug")
}
if err != nil {
gLog.Println(LvDEBUG, "detect NAT type error:", err)
@@ -490,27 +497,29 @@ func (pn *P2PNetwork) init() error {
go pn.readLoop()
pn.config.mac = getmac(pn.config.localIP)
pn.config.os = getOsName()
req := ReportBasic{
Mac: pn.config.mac,
LanIP: pn.config.localIP,
OS: pn.config.os,
HasIPv4: pn.config.hasIPv4,
HasUPNPorNATPMP: pn.config.hasUPNPorNATPMP,
Version: OpenP2PVersion,
}
rsp := netInfo()
gLog.Println(LvDEBUG, "netinfo:", rsp)
if rsp != nil && rsp.Country != "" {
if IsIPv6(rsp.IP.String()) {
pn.config.publicIPv6 = rsp.IP.String()
go func() {
req := ReportBasic{
Mac: pn.config.mac,
LanIP: pn.config.localIP,
OS: pn.config.os,
HasIPv4: pn.config.hasIPv4,
HasUPNPorNATPMP: pn.config.hasUPNPorNATPMP,
Version: OpenP2PVersion,
}
req.NetInfo = *rsp
} else {
pn.refreshIPv6(true)
}
req.IPv6 = pn.config.publicIPv6
pn.write(MsgReport, MsgReportBasic, &req)
rsp := netInfo()
gLog.Println(LvDEBUG, "netinfo:", rsp)
if rsp != nil && rsp.Country != "" {
if IsIPv6(rsp.IP.String()) {
pn.config.publicIPv6 = rsp.IP.String()
}
req.NetInfo = *rsp
} else {
pn.refreshIPv6(true)
}
req.IPv6 = pn.config.publicIPv6
pn.write(MsgReport, MsgReportBasic, &req)
}()
go pn.autorunApp()
gLog.Println(LvDEBUG, "P2PNetwork init ok")
break
}
@@ -550,6 +559,7 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
gConf.setUser(rsp.User)
if len(rsp.Node) >= MinNodeNameLen {
gConf.setNode(rsp.Node)
pn.config.Node = rsp.Node
}
gConf.save()
pn.localTs = time.Now().Unix()
@@ -571,8 +581,8 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
func (pn *P2PNetwork) readLoop() {
gLog.Printf(LvDEBUG, "P2PNetwork readLoop start")
pn.wg.Add(1)
defer pn.wg.Done()
pn.wgReconnect.Add(1)
defer pn.wgReconnect.Done()
for pn.running {
pn.conn.SetReadDeadline(time.Now().Add(NetworkHeartbeatTime + 10*time.Second))
t, msg, err := pn.conn.ReadMessage()

View File

@@ -106,7 +106,7 @@ func (t *P2PTunnel) connect() error {
AppKey: appKey,
Version: OpenP2PVersion,
LinkMode: t.config.linkMode,
IsUnderlayServer: t.config.isUnderlayServer ^ 1,
IsUnderlayServer: t.config.isUnderlayServer ^ 1, // peer
}
if req.Token == 0 { // no relay token
req.Token = t.pn.config.Token
@@ -154,6 +154,9 @@ func (t *P2PTunnel) setRun(running bool) {
}
func (t *P2PTunnel) isActive() bool {
if !t.isRuning() {
return false
}
t.hbMtx.Lock()
defer t.hbMtx.Unlock()
return time.Now().Before(t.hbTime.Add(TunnelIdleTimeout))
@@ -441,7 +444,7 @@ func (t *P2PTunnel) readLoop() {
continue
}
overlayID := binary.LittleEndian.Uint64(body[:8])
gLog.Printf(LvDEBUG, "%d tunnel read overlay data %d", t.id, overlayID)
gLog.Printf(LvDEBUG, "%d tunnel read overlay data %d bodylen=%d", t.id, overlayID, head.DataLen)
s, ok := t.overlayConns.Load(overlayID)
if !ok {
// debug level, when overlay connection closed, always has some packet not found tunnel
@@ -512,6 +515,7 @@ func (t *P2PTunnel) readLoop() {
rtid: req.RelayTunnelID,
appID: req.AppID,
appKey: GetKey(req.AppID),
running: true,
}
if req.Protocol == "udp" {
oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort})
@@ -525,7 +529,7 @@ func (t *P2PTunnel) readLoop() {
// calc key bytes for encrypt
if oConn.appKey != 0 {
encryptKey := make([]byte, 16)
encryptKey := make([]byte, AESKeySize)
binary.LittleEndian.PutUint64(encryptKey, oConn.appKey)
binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey)
oConn.appKeyBytes = encryptKey
@@ -545,7 +549,7 @@ func (t *P2PTunnel) readLoop() {
i, ok := t.overlayConns.Load(overlayID)
if ok {
oConn := i.(*overlayConn)
oConn.running = false
oConn.Close()
}
default:
}
@@ -607,14 +611,7 @@ func (t *P2PTunnel) closeOverlayConns(appID uint64) {
t.overlayConns.Range(func(_, i interface{}) bool {
oConn := i.(*overlayConn)
if oConn.appID == appID {
if oConn.connTCP != nil {
oConn.connTCP.Close()
oConn.connTCP = nil
}
if oConn.connUDP != nil {
oConn.connUDP.Close()
oConn.connUDP = nil
}
oConn.Close()
}
return true
})

View File

@@ -10,8 +10,8 @@ import (
"time"
)
const OpenP2PVersion = "3.5.2"
const ProducnName string = "openp2p"
const OpenP2PVersion = "3.6.11"
const ProductName string = "openp2p"
const LeastSupportVersion = "3.0.0"
const (
@@ -96,6 +96,7 @@ const (
MsgPushEditNode = 12
MsgPushAPPKey = 13
MsgPushReportLog = 14
MsgPushDstNodeOnline = 15
)
// MsgP2P sub type message
@@ -146,7 +147,7 @@ const (
MaxRetry = 10
RetryInterval = time.Second * 30
PublicIPEchoTimeout = time.Second * 1
NatTestTimeout = time.Second * 10
NatTestTimeout = time.Second * 5
ClientAPITimeout = time.Second * 10
MaxDirectTry = 3
)
@@ -223,6 +224,9 @@ type PushConnectReq struct {
LinkMode string `json:"linkMode,omitempty"`
IsUnderlayServer int `json:"isServer,omitempty"` // Requset spec peer is server
}
type PushDstNodeOnline struct {
Node string `json:"node,omitempty"`
}
type PushConnectRsp struct {
Error int `json:"error,omitempty"`
From string `json:"from,omitempty"`

View File

@@ -1,35 +0,0 @@
// Time-based One-time Password
package openp2p
import (
"crypto/hmac"
"crypto/sha256"
"encoding/binary"
)
const TOTPStep = 30 // 30s
func GenTOTP(token uint64, ts int64) uint64 {
step := ts / TOTPStep
tbuff := make([]byte, 8)
binary.LittleEndian.PutUint64(tbuff, token)
mac := hmac.New(sha256.New, tbuff)
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(step))
mac.Write(b)
num := binary.LittleEndian.Uint64(mac.Sum(nil)[:8])
// fmt.Printf("%x\n", mac.Sum(nil))
return num
}
func VerifyTOTP(code uint64, token uint64, ts int64) bool {
if code == 0 {
return false
}
if code == token {
return true
}
if code == GenTOTP(token, ts) || code == GenTOTP(token, ts-TOTPStep) || code == GenTOTP(token, ts+TOTPStep) {
return true
}
return false
}

View File

@@ -1,36 +0,0 @@
// Time-based One-time Password
package openp2p
import (
"testing"
"time"
)
func TestTOTP(t *testing.T) {
for i := 0; i < 20; i++ {
ts := time.Now().Unix()
code := GenTOTP(13666999958022769123, ts)
t.Log(code)
if !VerifyTOTP(code, 13666999958022769123, ts) {
t.Error("TOTP error")
}
if !VerifyTOTP(code, 13666999958022769123, ts-10) {
t.Error("TOTP error")
}
if !VerifyTOTP(code, 13666999958022769123, ts+10) {
t.Error("TOTP error")
}
if VerifyTOTP(code, 13666999958022769123, ts+60) {
t.Error("TOTP error")
}
if VerifyTOTP(code, 13666999958022769124, ts+1) {
t.Error("TOTP error")
}
if VerifyTOTP(code, 13666999958022769125, ts+1) {
t.Error("TOTP error")
}
time.Sleep(time.Second)
t.Log("round", i, " ", ts, " test ok")
}
}

View File

@@ -21,7 +21,7 @@ func setRLimit() error {
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
limit.Cur = 10240
limit.Cur = 65536
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}

69
core/util_freebsd.go Normal file
View File

@@ -0,0 +1,69 @@
package openp2p
import (
"bufio"
"bytes"
"io/ioutil"
"os"
"runtime"
"strings"
"syscall"
)
const (
defaultInstallPath = "/usr/local/openp2p"
defaultBinName = "openp2p"
)
func getOsName() (osName string) {
var sysnamePath string
sysnamePath = "/etc/redhat-release"
_, err := os.Stat(sysnamePath)
if err != nil && os.IsNotExist(err) {
str := "PRETTY_NAME="
f, err := os.Open("/etc/os-release")
if err == nil {
buf := bufio.NewReader(f)
for {
line, err := buf.ReadString('\n')
if err == nil {
line = strings.TrimSpace(line)
pos := strings.Count(line, str)
if pos > 0 {
len1 := len([]rune(str)) + 1
rs := []rune(line)
osName = string(rs[len1 : (len(rs))-1])
break
}
} else {
break
}
}
}
} else {
buff, err := ioutil.ReadFile(sysnamePath)
if err == nil {
osName = string(bytes.TrimSpace(buff))
}
}
if osName == "" {
osName = "FreeBSD"
}
return
}
func setRLimit() error {
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
limit.Max = 65536
limit.Cur = limit.Max
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
return nil
}
func setFirewall() {
}

View File

@@ -64,7 +64,7 @@ func setRLimit() error {
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err
}
limit.Max = 1024 * 1024
limit.Max = 65536
limit.Cur = limit.Max
if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
return err

View File

@@ -45,9 +45,9 @@ func setFirewall() {
}
if isXP {
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall del allowedprogram "%s"`, fullPath)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall add allowedprogram "%s" "%s" ENABLE`, ProducnName, fullPath)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall add allowedprogram "%s" "%s" ENABLE`, ProductName, fullPath)).Run()
} else { // win7 or later
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall del rule name="%s"`, ProducnName)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall add rule name="%s" dir=in action=allow program="%s" enable=yes`, ProducnName, fullPath)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall del rule name="%s"`, ProductName)).Run()
exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall add rule name="%s" dir=in action=allow program="%s" enable=yes`, ProductName, fullPath)).Run()
}
}

5
go.mod
View File

@@ -4,9 +4,10 @@ go 1.18
require (
github.com/gorilla/websocket v1.4.2
github.com/kardianos/service v1.2.0
github.com/lucas-clemente/quic-go v0.27.0
github.com/openp2p-cn/go-reuseport v0.3.2
github.com/openp2p-cn/service v1.0.0
github.com/openp2p-cn/totp v0.0.0-20230102121327-8e02f6b392ed
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f
)
@@ -14,6 +15,7 @@ require (
github.com/cheekybits/genny v1.0.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/kardianos/service v1.2.2 // indirect
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.1 // indirect
@@ -23,5 +25,6 @@ require (
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
)