Compare commits

..

2 Commits

Author SHA1 Message Date
TenderIronh
26e0fdf605 support udp 2022-02-26 18:50:57 +08:00
TenderIronh
3653ec19cd 1.2.0 2022-02-22 15:50:30 +08:00
19 changed files with 430 additions and 237 deletions

View File

@@ -40,11 +40,6 @@ P2P直连可以让你的设备跑满带宽。不论你的设备在任何网络
![image](/doc/images/install.png)
Windows默认会阻止没有花钱买它家证书签名过的程序选择“仍要运行”即可。
![image](/doc/images/win10warn.png)
![image](/doc/images/stillrun.png)
### 3.新建P2P应用
![image](/doc/images/devices.png)

View File

@@ -43,11 +43,6 @@ Download on local and remote computers and double-click to run, one-click instal
![image](/doc/images/install_en.png)
By default, Windows will block programs that have not been signed by the Microsoft's certificate, and you can select "Run anyway".
![image](/doc/images/win10warn_en.png)
![image](/doc/images/stillrun_en.png)
### 3.New P2PApp
![image](/doc/images/devices_en.png)

View File

@@ -17,6 +17,12 @@
>* -sharebandwidth: 作为共享节点时提供带宽默认10mbps. 如果是光纤大带宽,设置越大效果越好. 0表示不共享该节点只在私有的P2P网络使用。不加入共享的P2P网络这样也意味着无法使用别人的共享节点
>* -loglevel: 需要查看更多调试日志设置0默认是1
### 在docker容器里运行openp2p
我们暂时还没提供官方docker镜像你可以在随便一个容器里运行
```
nohup ./openp2p -d -node OFFICEPC1 -token TOKEN &
#这里由于一般的镜像都精简过install系统服务会失败所以使用直接daemon模式后台运行
```
## 连接
```
./openp2p -d -node HOMEPC123 -token TOKEN -appname OfficeWindowsRemote -peernode OFFICEPC1 -dstip 127.0.0.1 -dstport 3389 -srcport 23389

View File

@@ -19,6 +19,13 @@ Or
>* -sharebandwidth: Provides bandwidth when used as a shared node, the default is 10mbps. If it is a large bandwidth of optical fiber, the larger the setting, the better the effect. 0 means not shared, the node is only used in a private P2P network. Do not join the shared P2P network, which also means that you CAN NOT use other peoples shared nodes
>* -loglevel: Need to view more debug logs, set 0; the default is 1
### Run in Docker container
We don't provide official docker image yet, you can run it in any container
```
nohup ./openp2p -d -node OFFICEPC1 -token TOKEN &
# Since many docker images have been simplified, the install system service will fail, so the daemon mode is used to run in the background
```
## Connect
```
./openp2p -d -node HOMEPC123 -token TOKEN -appname OfficeWindowsRemote -peernode OFFICEPC1 -dstip 127.0.0.1 -dstport 3389 -srcport 23389

View File

@@ -30,13 +30,14 @@ type AppConfig struct {
peerConeNatPort int
retryNum int
retryTime time.Time
nextRetryTime time.Time
shareBandwidth int
}
// TODO: add loglevel, maxlogfilesize
type Config struct {
Network NetworkConfig `json:"network"`
Apps []AppConfig `json:"apps"`
Apps []*AppConfig `json:"apps"`
LogLevel int
mtx sync.Mutex
@@ -48,27 +49,29 @@ func (c *Config) switchApp(app AppConfig, enabled int) {
for i := 0; i < len(c.Apps); i++ {
if c.Apps[i].Protocol == app.Protocol && c.Apps[i].SrcPort == app.SrcPort {
c.Apps[i].Enabled = enabled
c.Apps[i].retryNum = 0
c.Apps[i].nextRetryTime = time.Now()
return
}
}
}
func (c *Config) add(app AppConfig, force bool) {
func (c *Config) add(app AppConfig, override bool) {
c.mtx.Lock()
defer c.mtx.Unlock()
if app.SrcPort == 0 || app.DstPort == 0 {
gLog.Println(LevelERROR, "invalid app ", app)
return
}
for i := 0; i < len(c.Apps); i++ {
if c.Apps[i].Protocol == app.Protocol && c.Apps[i].SrcPort == app.SrcPort {
if force {
c.Apps[i] = app
if override {
for i := 0; i < len(c.Apps); i++ {
if c.Apps[i].Protocol == app.Protocol && c.Apps[i].SrcPort == app.SrcPort {
c.Apps[i] = &app // override it
return
}
return
}
}
c.Apps = append(c.Apps, app)
c.Apps = append(c.Apps, &app)
}
func (c *Config) delete(app AppConfig) {
@@ -142,11 +145,8 @@ func parseParams() {
srcPort := flag.Int("srcport", 0, "source port ")
protocol := flag.String("protocol", "tcp", "tcp or udp")
appName := flag.String("appname", "", "app name")
flag.Bool("noshare", false, "deprecated. uses -sharebandwidth 0") // Deprecated, rm later
shareBandwidth := flag.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private node no limit")
flag.Bool("f", false, "deprecated. config file") // Deprecated, rm later
shareBandwidth := flag.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private network no limit")
daemonMode := flag.Bool("d", false, "daemonMode")
flag.Bool("bydaemon", false, "start by daemon") // Deprecated, rm later
logLevel := flag.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error")
flag.Parse()

View File

@@ -110,9 +110,22 @@ func (d *daemon) Control(ctrlComm string, exeAbsPath string, args []string) erro
// listen and build p2papp:
// ./openp2p install -node hhd1207-222 -token YOUR-TOKEN -sharebandwidth 0 -peernode hhdhome-n1 -dstip 127.0.0.1 -dstport 50022 -protocol tcp -srcport 22
func install() {
gLog.Println(LevelINFO, "openp2p start. version: ", OpenP2PVersion)
gLog.Println(LevelINFO, "Contact: QQ Group: 16947733, Email: openp2p.cn@gmail.com")
gLog.Println(LevelINFO, "install start")
defer gLog.Println(LevelINFO, "install end")
// auto uninstall
err := os.MkdirAll(defaultInstallPath, 0775)
if err != nil {
gLog.Printf(LevelERROR, "MkdirAll %s error:%s", defaultInstallPath, err)
return
}
err = os.Chdir(defaultInstallPath)
if err != nil {
gLog.Println(LevelERROR, "cd error:", err)
return
}
uninstall()
// save config file
@@ -127,23 +140,25 @@ func install() {
srcPort := installFlag.Int("srcport", 0, "source port ")
protocol := installFlag.String("protocol", "tcp", "tcp or udp")
appName := flag.String("appname", "", "app name")
installFlag.Bool("noshare", false, "deprecated. uses -sharebandwidth 0")
shareBandwidth := installFlag.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private node no limit")
shareBandwidth := installFlag.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private network no limit")
logLevel := installFlag.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error")
installFlag.Parse(os.Args[2:])
if *node != "" && len(*node) < 8 {
gLog.Println(LevelERROR, ErrNodeTooShort)
os.Exit(9)
}
if *node == "" { // if node name not set. use os.Hostname
hostname := defaultNodeName()
node = &hostname
}
gConf.load() // load old config. otherwise will clear all apps
gConf.LogLevel = *logLevel
gConf.Network.ServerHost = *serverHost
gConf.Network.Token = *token
gConf.Network.Node = *node
if *node != "" {
if len(*node) < 8 {
gLog.Println(LevelERROR, ErrNodeTooShort)
os.Exit(9)
}
gConf.Network.Node = *node
} else {
if gConf.Network.Node == "" { // if node name not set. use os.Hostname
gConf.Network.Node = defaultNodeName()
}
}
gConf.Network.ServerPort = 27183
gConf.Network.UDPPort1 = 27182
gConf.Network.UDPPort2 = 27183
@@ -158,16 +173,6 @@ func install() {
if config.SrcPort != 0 {
gConf.add(config, true)
}
err := os.MkdirAll(defaultInstallPath, 0775)
if err != nil {
gLog.Printf(LevelERROR, "MkdirAll %s error:%s", defaultInstallPath, err)
return
}
err = os.Chdir(defaultInstallPath)
if err != nil {
gLog.Println(LevelERROR, "cd error:", err)
return
}
gConf.save()
targetPath := filepath.Join(defaultInstallPath, defaultBinName)
d := daemon{}
@@ -195,7 +200,6 @@ func install() {
dst.Close()
// install system service
// args := []string{""}
gLog.Println(LevelINFO, "targetPath:", targetPath)
err = d.Control("install", targetPath, []string{"-d"})
if err == nil {

Binary file not shown.

Before

Width:  |  Height:  |  Size: 65 KiB

After

Width:  |  Height:  |  Size: 19 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 17 KiB

After

Width:  |  Height:  |  Size: 8.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 50 KiB

After

Width:  |  Height:  |  Size: 15 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 14 KiB

After

Width:  |  Height:  |  Size: 6.8 KiB

View File

@@ -110,23 +110,27 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
case MsgPushReportApps:
gLog.Println(LevelINFO, "MsgPushReportApps")
req := ReportApps{}
// TODO: add the retrying apps
gConf.mtx.Lock()
defer gConf.mtx.Unlock()
for _, config := range gConf.Apps {
appActive := 0
relayNode := ""
relayMode := ""
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
if ok {
app := i.(*p2pApp)
if app.isActive() {
appActive = 1
}
relayNode = app.relayNode
relayMode = app.relayMode
}
appInfo := AppInfo{
AppName: config.AppName,
Protocol: config.Protocol,
SrcPort: config.SrcPort,
// RelayNode: relayNode,
AppName: config.AppName,
Protocol: config.Protocol,
SrcPort: config.SrcPort,
RelayNode: relayNode,
RelayMode: relayMode,
PeerNode: config.PeerNode,
DstHost: config.DstHost,
DstPort: config.DstPort,

4
log.go
View File

@@ -110,10 +110,10 @@ func (vl *V8log) checkFile() {
for l, logFile := range vl.files {
f, e := logFile.Stat()
if e != nil {
break
continue
}
if f.Size() <= vl.maxLogSize {
break
continue
}
logFile.Close()
fname := f.Name()

View File

@@ -13,7 +13,6 @@ func main() {
binDir := filepath.Dir(os.Args[0])
os.Chdir(binDir) // for system service
gLog = InitLogger(binDir, "openp2p", LevelDEBUG, 1024*1024, LogFileAndConsole)
gLog.Println(LevelINFO, "openp2p start. version: ", OpenP2PVersion)
// TODO: install sub command, deamon process
if len(os.Args) > 1 {
switch os.Args[1] {
@@ -41,7 +40,8 @@ func main() {
} else {
installByFilename()
}
gLog.Println(LevelINFO, "openp2p start. version: ", OpenP2PVersion)
gLog.Println(LevelINFO, "Contact: QQ Group: 16947733, Email: openp2p.cn@gmail.com")
parseParams()
gLog.Println(LevelINFO, &gConf)
setFirewall()

150
overlay.go Normal file
View File

@@ -0,0 +1,150 @@
package main
import (
"bytes"
"encoding/binary"
"errors"
"net"
"time"
)
var ErrDeadlineExceeded error = &DeadlineExceededError{}
// DeadlineExceededError is returned for an expired deadline.
type DeadlineExceededError struct{}
// Implement the net.Error interface.
// The string is "i/o timeout" because that is what was returned
// by earlier Go versions. Changing it may break programs that
// match on error strings.
func (e *DeadlineExceededError) Error() string { return "i/o timeout" }
func (e *DeadlineExceededError) Timeout() bool { return true }
func (e *DeadlineExceededError) Temporary() bool { return true }
// implement io.Writer
type overlayConn struct {
tunnel *P2PTunnel
connTCP net.Conn
id uint64
rtid uint64
running bool
isClient bool
appID uint64
appKey uint64
appKeyBytes []byte
// for udp
connUDP *net.UDPConn
remoteAddr net.Addr
udpRelayData chan []byte
lastReadUDPTs time.Time
}
func (oConn *overlayConn) run() {
gLog.Printf(LevelDEBUG, "%d overlayConn run start", oConn.id)
defer gLog.Printf(LevelDEBUG, "%d overlayConn run end", oConn.id)
oConn.running = true
oConn.lastReadUDPTs = time.Now()
buffer := make([]byte, ReadBuffLen+PaddingSize)
readBuf := buffer[:ReadBuffLen]
encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
tunnelHead := new(bytes.Buffer)
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, oConn.rtid)
binary.Write(tunnelHead, binary.LittleEndian, oConn.id)
for oConn.running && oConn.tunnel.isRuning() {
buff, dataLen, err := oConn.Read(readBuf)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
continue
}
// overlay tcp connection normal close, debug log
gLog.Printf(LevelDEBUG, "overlayConn %d read error:%s,close it", oConn.id, err)
break
}
payload := buff[:dataLen]
if oConn.appKey != 0 {
payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, buffer[:dataLen], dataLen)
}
writeBytes := append(tunnelHead.Bytes(), payload...)
if oConn.rtid == 0 {
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes)
gLog.Printf(LevelDEBUG, "write overlay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes))
} else {
// write raley data
all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...)
all = append(all, writeBytes...)
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all)
gLog.Printf(LevelDEBUG, "write relay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes))
}
}
if oConn.connTCP != nil {
oConn.connTCP.Close()
}
if oConn.connUDP != nil {
oConn.connUDP.Close()
}
oConn.tunnel.overlayConns.Delete(oConn.id)
// notify peer disconnect
if oConn.isClient {
req := OverlayDisconnectReq{ID: oConn.id}
if oConn.rtid == 0 {
oConn.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
} else {
// write relay data
msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
}
}
func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error) {
if oConn.connUDP != nil {
if time.Now().After(oConn.lastReadUDPTs.Add(time.Minute * 5)) {
err = errors.New("udp close")
return
}
if oConn.remoteAddr != nil { // as server
select {
case buff = <-oConn.udpRelayData:
n = len(buff)
oConn.lastReadUDPTs = time.Now()
case <-time.After(time.Second * 10):
err = ErrDeadlineExceeded
}
} else { // as client
oConn.connUDP.SetReadDeadline(time.Now().Add(5 * time.Second))
n, _, err = oConn.connUDP.ReadFrom(reuseBuff)
if err == nil {
oConn.lastReadUDPTs = time.Now()
}
buff = reuseBuff
}
return
}
oConn.connTCP.SetReadDeadline(time.Now().Add(time.Second * 5))
n, err = oConn.connTCP.Read(reuseBuff)
buff = reuseBuff
return
}
// calling by p2pTunnel
func (oConn *overlayConn) Write(buff []byte) (n int, err error) {
// add mutex when multi-thread calling
if oConn.connUDP != nil {
if oConn.remoteAddr == nil {
n, err = oConn.connUDP.Write(buff)
} else {
n, err = oConn.connUDP.WriteTo(buff, oConn.remoteAddr)
}
if err != nil {
oConn.running = false
}
return
}
n, err = oConn.connTCP.Write(buff)
if err != nil {
oConn.running = false
}
return
}

View File

@@ -1,85 +0,0 @@
package main
import (
"bytes"
"encoding/binary"
"net"
"time"
)
// implement io.Writer
type overlayTCP struct {
tunnel *P2PTunnel
conn net.Conn
id uint64
rtid uint64
running bool
isClient bool
appID uint64
appKey uint64
appKeyBytes []byte
}
func (otcp *overlayTCP) run() {
gLog.Printf(LevelDEBUG, "%d overlayTCP run start", otcp.id)
defer gLog.Printf(LevelDEBUG, "%d overlayTCP run end", otcp.id)
otcp.running = true
buffer := make([]byte, ReadBuffLen+PaddingSize)
readBuf := buffer[:ReadBuffLen]
encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
tunnelHead := new(bytes.Buffer)
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, otcp.rtid)
binary.Write(tunnelHead, binary.LittleEndian, otcp.id)
for otcp.running && otcp.tunnel.isRuning() {
otcp.conn.SetReadDeadline(time.Now().Add(time.Second * 5))
dataLen, err := otcp.conn.Read(readBuf)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
continue
}
// overlay tcp connection normal close, debug log
gLog.Printf(LevelDEBUG, "overlayTCP %d read error:%s,close it", otcp.id, err)
break
} else {
payload := readBuf[:dataLen]
if otcp.appKey != 0 {
payload, _ = encryptBytes(otcp.appKeyBytes, encryptData, buffer[:dataLen], dataLen)
}
writeBytes := append(tunnelHead.Bytes(), payload...)
if otcp.rtid == 0 {
otcp.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes)
} else {
// write raley data
all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...)
all = append(all, writeBytes...)
otcp.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all)
gLog.Printf(LevelDEBUG, "write relay data to %d:%d bodylen=%d", otcp.rtid, otcp.id, len(writeBytes))
}
}
}
otcp.conn.Close()
otcp.tunnel.overlayConns.Delete(otcp.id)
// notify peer disconnect
if otcp.isClient {
req := OverlayDisconnectReq{ID: otcp.id}
if otcp.rtid == 0 {
otcp.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
} else {
// write relay data
msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
otcp.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
}
}
// calling by p2pTunnel
func (otcp *overlayTCP) Write(buff []byte) (n int, err error) {
// add mutex when multi-thread calling
n, err = otcp.conn.Write(buff)
if err != nil {
otcp.tunnel.overlayConns.Delete(otcp.id)
}
return
}

159
p2papp.go
View File

@@ -6,22 +6,26 @@ import (
"fmt"
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"
)
type p2pApp struct {
config AppConfig
listener net.Listener
tunnel *P2PTunnel
rtid uint64
relayNode string
hbTime time.Time
hbMtx sync.Mutex
running bool
id uint64
key uint64
wg sync.WaitGroup
config AppConfig
listener net.Listener
listenerUDP *net.UDPConn
tunnel *P2PTunnel
rtid uint64
relayNode string
relayMode string
hbTime time.Time
hbMtx sync.Mutex
running bool
id uint64
key uint64
wg sync.WaitGroup
}
func (app *p2pApp) isActive() bool {
@@ -44,38 +48,42 @@ func (app *p2pApp) updateHeartbeat() {
}
func (app *p2pApp) listenTCP() error {
gLog.Printf(LevelDEBUG, "tcp accept on port %d start", app.config.SrcPort)
defer gLog.Printf(LevelDEBUG, "tcp accept on port %d end", app.config.SrcPort)
var err error
app.listener, err = net.Listen("tcp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort))
if err != nil {
gLog.Printf(LevelERROR, "listen error:%s", err)
return err
}
for {
for app.running {
conn, err := app.listener.Accept()
if err != nil {
gLog.Printf(LevelERROR, "%d accept error:%s", app.tunnel.id, err)
if app.running {
gLog.Printf(LevelERROR, "%d accept error:%s", app.tunnel.id, err)
}
break
}
otcp := overlayTCP{
oConn := overlayConn{
tunnel: app.tunnel,
conn: conn,
connTCP: conn,
id: rand.Uint64(),
isClient: true,
rtid: app.rtid,
appID: app.id,
appKey: app.key,
}
// calc key bytes for encrypt
if otcp.appKey != 0 {
// pre-calc key bytes for encrypt
if oConn.appKey != 0 {
encryptKey := make([]byte, AESKeySize)
binary.LittleEndian.PutUint64(encryptKey, otcp.appKey)
binary.LittleEndian.PutUint64(encryptKey[8:], otcp.appKey)
otcp.appKeyBytes = encryptKey
binary.LittleEndian.PutUint64(encryptKey, oConn.appKey)
binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey)
oConn.appKeyBytes = encryptKey
}
app.tunnel.overlayConns.Store(otcp.id, &otcp)
gLog.Printf(LevelDEBUG, "Accept overlayID:%d", otcp.id)
app.tunnel.overlayConns.Store(oConn.id, &oConn)
gLog.Printf(LevelDEBUG, "Accept TCP overlayID:%d", oConn.id)
// tell peer connect
req := OverlayConnectReq{ID: otcp.id,
req := OverlayConnectReq{ID: oConn.id,
Token: app.tunnel.pn.config.Token,
DstIP: app.config.DstHost,
DstPort: app.config.DstPort,
@@ -92,29 +100,117 @@ func (app *p2pApp) listenTCP() error {
msgWithHead := append(relayHead.Bytes(), msg...)
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
go oConn.run()
}
return nil
}
go otcp.run()
func (app *p2pApp) listenUDP() error {
gLog.Printf(LevelDEBUG, "udp accept on port %d start", app.config.SrcPort)
defer gLog.Printf(LevelDEBUG, "udp accept on port %d end", app.config.SrcPort)
var err error
app.listenerUDP, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: app.config.SrcPort})
if err != nil {
gLog.Printf(LevelERROR, "listen error:%s", err)
return err
}
buffer := make([]byte, 64*1024)
udpID := make([]byte, 8)
for {
app.listenerUDP.SetReadDeadline(time.Now().Add(time.Second * 10))
len, remoteAddr, err := app.listenerUDP.ReadFrom(buffer)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
continue
} else {
gLog.Printf(LevelERROR, "udp read failed:%s", err)
break
}
} else {
b := bytes.Buffer{}
b.Write(buffer[:len])
// load from app.tunnel.overlayConns by remoteAddr error, new udp connection
remoteIP := strings.Split(remoteAddr.String(), ":")[0]
port, _ := strconv.Atoi(strings.Split(remoteAddr.String(), ":")[1])
a := net.ParseIP(remoteIP)
udpID[0] = a[0]
udpID[1] = a[1]
udpID[2] = a[2]
udpID[3] = a[3]
udpID[4] = byte(port)
udpID[5] = byte(port >> 8)
id := binary.LittleEndian.Uint64(udpID)
s, ok := app.tunnel.overlayConns.Load(id)
if !ok {
oConn := overlayConn{
tunnel: app.tunnel,
connUDP: app.listenerUDP,
remoteAddr: remoteAddr,
udpRelayData: make(chan []byte, 1000),
id: id,
isClient: true,
rtid: app.rtid,
appID: app.id,
appKey: app.key,
}
// calc key bytes for encrypt
if oConn.appKey != 0 {
encryptKey := make([]byte, AESKeySize)
binary.LittleEndian.PutUint64(encryptKey, oConn.appKey)
binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey)
oConn.appKeyBytes = encryptKey
}
app.tunnel.overlayConns.Store(oConn.id, &oConn)
gLog.Printf(LevelDEBUG, "Accept UDP overlayID:%d", oConn.id)
// tell peer connect
req := OverlayConnectReq{ID: oConn.id,
Token: app.tunnel.pn.config.Token,
DstIP: app.config.DstHost,
DstPort: app.config.DstPort,
Protocol: app.config.Protocol,
AppID: app.id,
}
if app.rtid == 0 {
app.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayConnectReq, &req)
} else {
req.RelayTunnelID = app.tunnel.id
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, app.rtid)
msg, _ := newMessage(MsgP2P, MsgOverlayConnectReq, &req)
msgWithHead := append(relayHead.Bytes(), msg...)
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
go oConn.run()
oConn.udpRelayData <- b.Bytes()
}
// load from app.tunnel.overlayConns by remoteAddr ok, write relay data
overlayConn, ok := s.(*overlayConn)
if !ok {
continue
}
overlayConn.udpRelayData <- b.Bytes()
}
}
return nil
}
func (app *p2pApp) listen() error {
gLog.Printf(LevelINFO, "LISTEN ON PORT %d START", app.config.SrcPort)
defer gLog.Printf(LevelINFO, "LISTEN ON PORT %d START", app.config.SrcPort)
gLog.Printf(LevelINFO, "LISTEN ON PORT %s:%d START", app.config.Protocol, app.config.SrcPort)
defer gLog.Printf(LevelINFO, "LISTEN ON PORT %s:%d END", app.config.Protocol, app.config.SrcPort)
app.wg.Add(1)
defer app.wg.Done()
app.running = true
if app.rtid != 0 {
go app.relayHeartbeatLoop()
}
for app.running {
for app.tunnel.isRuning() && app.running {
if app.config.Protocol == "udp" {
app.listenTCP()
app.listenUDP()
} else {
app.listenTCP()
}
time.Sleep(time.Second * 5)
// TODO: listen UDP
time.Sleep(time.Second * 10)
}
return nil
}
@@ -124,6 +220,9 @@ func (app *p2pApp) close() {
if app.listener != nil {
app.listener.Close()
}
if app.listenerUDP != nil {
app.listenerUDP.Close()
}
if app.tunnel != nil {
app.tunnel.closeOverlayConns(app.id)
}

View File

@@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/rand"
"net"
"net/url"
@@ -96,6 +97,9 @@ func (pn *P2PNetwork) runAll() {
gConf.mtx.Lock()
defer gConf.mtx.Unlock()
for _, config := range gConf.Apps {
if config.nextRetryTime.After(time.Now()) {
continue
}
if config.Enabled == 0 {
continue
}
@@ -103,37 +107,37 @@ func (pn *P2PNetwork) runAll() {
config.AppName = fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)
}
appExist := false
appActive := false
var appID uint64
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
if ok {
app := i.(*p2pApp)
appExist = true
appID = app.id
if app.isActive() {
appActive = true
}
}
if appExist && appActive {
continue
}
if appExist && !appActive {
gLog.Printf(LevelINFO, "detect app %s disconnect, reconnecting...", config.AppName)
pn.DeleteApp(config)
if config.retryTime.Add(time.Minute * 15).Before(time.Now()) {
config.retryNum = 0
}
config.retryNum++
config.retryTime = time.Now()
if config.retryNum > MaxRetry {
gLog.Printf(LevelERROR, "app %s%d retry more than %d times, exit.", config.Protocol, config.SrcPort, MaxRetry)
continue
}
}
go pn.AddApp(config)
if appExist {
pn.DeleteApp(*config)
}
if config.retryNum > 0 {
gLog.Printf(LevelINFO, "detect app %s(%d) disconnect, reconnecting the %d times...", config.AppName, appID, config.retryNum)
if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // normal lasts 15min
config.retryNum = 0
}
}
config.retryNum++
config.retryTime = time.Now()
increase := math.Pow(1.3, float64(config.retryNum))
if increase > 900 {
increase = 900
}
config.nextRetryTime = time.Now().Add(time.Second * time.Duration(increase)) // exponential increase retry time. 1.3^x
pn.AddApp(*config)
}
}
func (pn *P2PNetwork) autorunApp() {
gLog.Println(LevelINFO, "autorunApp start")
// TODO: use gConf to check reconnect
for pn.running {
time.Sleep(time.Second)
if !pn.online {
@@ -145,23 +149,23 @@ func (pn *P2PNetwork) autorunApp() {
gLog.Println(LevelINFO, "autorunApp end")
}
func (pn *P2PNetwork) addRelayTunnel(config AppConfig, appid uint64, appkey uint64) (*P2PTunnel, uint64, error) {
func (pn *P2PNetwork) addRelayTunnel(config AppConfig, appid uint64, appkey uint64) (*P2PTunnel, uint64, string, error) {
gLog.Printf(LevelINFO, "addRelayTunnel to %s start", config.PeerNode)
defer gLog.Printf(LevelINFO, "addRelayTunnel to %s end", config.PeerNode)
pn.write(MsgRelay, MsgRelayNodeReq, &RelayNodeReq{config.PeerNode})
head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, time.Second*10)
if head == nil {
return nil, 0, errors.New("read MsgRelayNodeRsp error")
return nil, 0, "", errors.New("read MsgRelayNodeRsp error")
}
rsp := RelayNodeRsp{}
err := json.Unmarshal(body, &rsp)
if err != nil {
gLog.Printf(LevelERROR, "wrong RelayNodeRsp:%s", err)
return nil, 0, errors.New("unmarshal MsgRelayNodeRsp error")
return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error")
}
if rsp.RelayName == "" || rsp.RelayToken == 0 {
gLog.Printf(LevelERROR, "MsgRelayNodeReq error")
return nil, 0, errors.New("MsgRelayNodeReq error")
return nil, 0, "", errors.New("MsgRelayNodeReq error")
}
gLog.Printf(LevelINFO, "got relay node:%s", rsp.RelayName)
relayConfig := config
@@ -170,7 +174,7 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig, appid uint64, appkey uint
t, err := pn.addDirectTunnel(relayConfig, 0)
if err != nil {
gLog.Println(LevelERROR, "direct connect error:", err)
return nil, 0, err
return nil, 0, "", err
}
// notify peer addRelayTunnel
req := AddRelayTunnelReq{
@@ -187,15 +191,15 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig, appid uint64, appkey uint
head, body = pn.read(config.PeerNode, MsgPush, MsgPushAddRelayTunnelRsp, PeerAddRelayTimeount) // TODO: const value
if head == nil {
gLog.Printf(LevelERROR, "read MsgPushAddRelayTunnelRsp error")
return nil, 0, errors.New("read MsgPushAddRelayTunnelRsp error")
return nil, 0, "", errors.New("read MsgPushAddRelayTunnelRsp error")
}
rspID := TunnelMsg{}
err = json.Unmarshal(body, &rspID)
if err != nil {
gLog.Printf(LevelERROR, "wrong RelayNodeRsp:%s", err)
return nil, 0, errors.New("unmarshal MsgRelayNodeRsp error")
return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error")
}
return t, rspID.ID, err
return t, rspID.ID, rsp.Mode, err
}
func (pn *P2PNetwork) AddApp(config AppConfig) error {
@@ -215,24 +219,26 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
}
appID := rand.Uint64()
appKey := uint64(0)
t, err := pn.addDirectTunnel(config, 0)
var rtid uint64
relayNode := ""
relayMode := ""
peerNatType := NATUnknown
peerIP := ""
errMsg := ""
if err != nil && err == ErrorHandshake {
gLog.Println(LevelERROR, "direct connect failed, try to relay")
appKey = rand.Uint64()
t, rtid, err = pn.addRelayTunnel(config, appID, appKey)
if t != nil {
relayNode = t.config.PeerNode
}
}
t, err := pn.addDirectTunnel(config, 0)
if t != nil {
peerNatType = t.config.peerNatType
peerIP = t.config.peerIP
}
if err != nil && err == ErrorHandshake {
gLog.Println(LevelERROR, "direct connect failed, try to relay")
appKey = rand.Uint64()
t, rtid, relayMode, err = pn.addRelayTunnel(config, appID, appKey)
if t != nil {
relayNode = t.config.PeerNode
}
}
if err != nil {
errMsg = err.Error()
}
@@ -261,6 +267,7 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error {
config: config,
rtid: rtid,
relayNode: relayNode,
relayMode: relayMode,
hbTime: time.Now()}
pn.apps.Store(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort), &app)
if err == nil {

View File

@@ -276,7 +276,7 @@ func (t *P2PTunnel) readLoop() {
gLog.Printf(LevelDEBUG, "%d tunnel not found overlay connection %d", t.id, overlayID)
continue
}
overlayConn, ok := s.(*overlayTCP)
overlayConn, ok := s.(*overlayConn)
if !ok {
continue
}
@@ -333,32 +333,34 @@ func (t *P2PTunnel) readLoop() {
overlayID := req.ID
gLog.Printf(LevelDEBUG, "App:%d overlayID:%d connect %+v", req.AppID, overlayID, req)
if req.Protocol == "tcp" {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5)
if err != nil {
gLog.Println(LevelERROR, err)
continue
}
otcp := overlayTCP{
tunnel: t,
conn: conn,
id: overlayID,
isClient: false,
rtid: req.RelayTunnelID,
appID: req.AppID,
appKey: GetKey(req.AppID),
}
// calc key bytes for encrypt
if otcp.appKey != 0 {
encryptKey := make([]byte, 16)
binary.LittleEndian.PutUint64(encryptKey, otcp.appKey)
binary.LittleEndian.PutUint64(encryptKey[8:], otcp.appKey)
otcp.appKeyBytes = encryptKey
}
t.overlayConns.Store(otcp.id, &otcp)
go otcp.run()
oConn := overlayConn{
tunnel: t,
id: overlayID,
isClient: false,
rtid: req.RelayTunnelID,
appID: req.AppID,
appKey: GetKey(req.AppID),
}
if req.Protocol == "udp" {
oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort})
} else {
oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5)
}
if err != nil {
gLog.Println(LevelERROR, err)
continue
}
// calc key bytes for encrypt
if oConn.appKey != 0 {
encryptKey := make([]byte, 16)
binary.LittleEndian.PutUint64(encryptKey, oConn.appKey)
binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey)
oConn.appKeyBytes = encryptKey
}
t.overlayConns.Store(oConn.id, &oConn)
go oConn.run()
case MsgOverlayDisconnectReq:
req := OverlayDisconnectReq{}
err := json.Unmarshal(body, &req)
@@ -370,8 +372,8 @@ func (t *P2PTunnel) readLoop() {
gLog.Printf(LevelDEBUG, "%d disconnect overlay connection %d", t.id, overlayID)
i, ok := t.overlayConns.Load(overlayID)
if ok {
otcp := i.(*overlayTCP)
otcp.running = false
oConn := i.(*overlayConn)
oConn.running = false
}
default:
}
@@ -408,9 +410,16 @@ func (t *P2PTunnel) listen() error {
func (t *P2PTunnel) closeOverlayConns(appID uint64) {
t.overlayConns.Range(func(_, i interface{}) bool {
otcp := i.(*overlayTCP)
if otcp.appID == appID {
otcp.conn.Close()
oConn := i.(*overlayConn)
if oConn.appID == appID {
if oConn.connTCP != nil {
oConn.connTCP.Close()
oConn.connTCP = nil
}
if oConn.connUDP != nil {
oConn.connUDP.Close()
oConn.connUDP = nil
}
}
return true
})

View File

@@ -10,7 +10,7 @@ import (
"time"
)
const OpenP2PVersion = "1.1.0"
const OpenP2PVersion = "1.3.0"
const ProducnName string = "openp2p"
type openP2PHeader struct {
@@ -117,7 +117,7 @@ const (
)
const (
ReadBuffLen = 1024
ReadBuffLen = 4096 // for UDP maybe not enough
NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow
TunnelHeartbeatTime = time.Second * 15
TunnelIdleTimeout = time.Minute
@@ -236,6 +236,7 @@ type RelayNodeReq struct {
}
type RelayNodeRsp struct {
Mode string `json:"mode,omitempty"` // private,public
RelayName string `json:"relayName,omitempty"`
RelayToken uint64 `json:"relayToken,omitempty"`
}
@@ -294,6 +295,7 @@ type AppInfo struct {
PeerIP string `json:"peerIP,omitempty"`
ShareBandwidth int `json:"shareBandWidth,omitempty"`
RelayNode string `json:"relayNode,omitempty"`
RelayMode string `json:"relayMode,omitempty"`
Version string `json:"version,omitempty"`
RetryTime string `json:"retryTime,omitempty"`
IsActive int `json:"isActive,omitempty"`