mirror of
https://github.com/openp2p-cn/openp2p.git
synced 2026-05-08 15:03:32 +08:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b8d3f7d47 | ||
|
|
26e0fdf605 |
@@ -39,7 +39,12 @@ P2P直连可以让你的设备跑满带宽。不论你的设备在任何网络
|
||||
分别在本地和远程电脑下载后双击运行,一键安装
|
||||
|
||||

|
||||
|
||||
Windows默认会阻止没有花钱买它家证书签名过的程序,选择“仍要运行”即可。
|
||||
|
||||

|
||||
|
||||

|
||||
### 3.新建P2P应用
|
||||
|
||||

|
||||
@@ -120,8 +125,7 @@ go build
|
||||
|
||||
## 参与贡献
|
||||
TODO或ISSUE里如果有你擅长的领域,或者你有特别好的主意,可以加入OpenP2P项目,贡献你的代码。待项目茁壮成长后,你们就是知名开源项目的主要代码贡献者,岂不快哉。
|
||||
## 商业合作
|
||||
它是一个中国人发起的项目,更懂国内网络环境,更懂用户需求,更好的企业级支持
|
||||
|
||||
## 技术交流
|
||||
QQ群:16947733
|
||||
邮箱:openp2p.cn@gmail.com tenderiron@139.com
|
||||
|
||||
@@ -43,6 +43,12 @@ Download on local and remote computers and double-click to run, one-click instal
|
||||
|
||||

|
||||
|
||||
By default, Windows will block programs that have not been signed by the Microsoft's certificate, and you can select "Run anyway".
|
||||
|
||||

|
||||
|
||||

|
||||
|
||||
### 3.New P2PApp
|
||||
|
||||

|
||||
|
||||
49
config.go
49
config.go
@@ -4,7 +4,6 @@ import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
@@ -32,15 +31,17 @@ type AppConfig struct {
|
||||
retryTime time.Time
|
||||
nextRetryTime time.Time
|
||||
shareBandwidth int
|
||||
errMsg string
|
||||
connectTime time.Time
|
||||
}
|
||||
|
||||
// TODO: add loglevel, maxlogfilesize
|
||||
type Config struct {
|
||||
Network NetworkConfig `json:"network"`
|
||||
Apps []*AppConfig `json:"apps"`
|
||||
LogLevel int
|
||||
|
||||
mtx sync.Mutex
|
||||
Network NetworkConfig `json:"network"`
|
||||
Apps []*AppConfig `json:"apps"`
|
||||
LogLevel int
|
||||
daemonMode bool
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
func (c *Config) switchApp(app AppConfig, enabled int) {
|
||||
@@ -115,6 +116,27 @@ func (c *Config) load() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Config) setToken(token uint64) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
c.Network.Token = token
|
||||
}
|
||||
func (c *Config) setUser(user string) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
c.Network.User = user
|
||||
}
|
||||
func (c *Config) setNode(node string) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
c.Network.Node = node
|
||||
}
|
||||
func (c *Config) setShareBandwidth(bw int) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
c.Network.ShareBandwidth = bw
|
||||
}
|
||||
|
||||
type NetworkConfig struct {
|
||||
// local info
|
||||
Token uint64
|
||||
@@ -147,6 +169,7 @@ func parseParams() {
|
||||
appName := flag.String("appname", "", "app name")
|
||||
shareBandwidth := flag.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private network no limit")
|
||||
daemonMode := flag.Bool("d", false, "daemonMode")
|
||||
notVerbose := flag.Bool("nv", false, "not log console")
|
||||
logLevel := flag.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error")
|
||||
flag.Parse()
|
||||
|
||||
@@ -161,8 +184,8 @@ func parseParams() {
|
||||
if config.SrcPort != 0 {
|
||||
gConf.add(config, true)
|
||||
}
|
||||
gConf.mtx.Lock()
|
||||
|
||||
// gConf.mtx.Lock() // when calling this func it's single-thread no lock
|
||||
gConf.daemonMode = *daemonMode
|
||||
// spec paramters in commandline will always be used
|
||||
flag.Visit(func(f *flag.Flag) {
|
||||
if f.Name == "sharebandwidth" {
|
||||
@@ -203,11 +226,9 @@ func parseParams() {
|
||||
gConf.Network.UDPPort1 = 27182
|
||||
gConf.Network.UDPPort2 = 27183
|
||||
gLog.setLevel(LogLevel(gConf.LogLevel))
|
||||
gConf.mtx.Unlock()
|
||||
gConf.save()
|
||||
if *daemonMode {
|
||||
d := daemon{}
|
||||
d.run()
|
||||
os.Exit(0)
|
||||
if *notVerbose {
|
||||
gLog.setMode(LogFile)
|
||||
}
|
||||
// gConf.mtx.Unlock()
|
||||
gConf.save()
|
||||
}
|
||||
|
||||
58
daemon.go
58
daemon.go
@@ -64,10 +64,18 @@ func (d *daemon) run() {
|
||||
break
|
||||
}
|
||||
}
|
||||
args = append(args, "-nv")
|
||||
for {
|
||||
// start worker
|
||||
tmpDump := filepath.Join("log", "dump.log.tmp")
|
||||
dumpFile := filepath.Join("log", "dump.log")
|
||||
f, err := os.Create(filepath.Join(tmpDump))
|
||||
if err != nil {
|
||||
gLog.Printf(LevelERROR, "start worker error:%s", err)
|
||||
return
|
||||
}
|
||||
gLog.Println(LevelINFO, "start worker process, args:", args)
|
||||
execSpec := &os.ProcAttr{Files: []*os.File{os.Stdin, os.Stdout, os.Stderr}}
|
||||
execSpec := &os.ProcAttr{Env: append(os.Environ(), "GOTRACEBACK=crash"), Files: []*os.File{os.Stdin, os.Stdout, f}}
|
||||
p, err := os.StartProcess(binPath, args, execSpec)
|
||||
if err != nil {
|
||||
gLog.Printf(LevelERROR, "start worker error:%s", err)
|
||||
@@ -75,6 +83,12 @@ func (d *daemon) run() {
|
||||
}
|
||||
d.proc = p
|
||||
_, _ = p.Wait()
|
||||
f.Close()
|
||||
time.Sleep(time.Second)
|
||||
err = os.Rename(tmpDump, dumpFile)
|
||||
if err != nil {
|
||||
gLog.Printf(LevelERROR, "rename dump error:%s", err)
|
||||
}
|
||||
if !d.running {
|
||||
return
|
||||
}
|
||||
@@ -115,6 +129,17 @@ func install() {
|
||||
gLog.Println(LevelINFO, "install start")
|
||||
defer gLog.Println(LevelINFO, "install end")
|
||||
// auto uninstall
|
||||
err := os.MkdirAll(defaultInstallPath, 0775)
|
||||
|
||||
if err != nil {
|
||||
gLog.Printf(LevelERROR, "MkdirAll %s error:%s", defaultInstallPath, err)
|
||||
return
|
||||
}
|
||||
err = os.Chdir(defaultInstallPath)
|
||||
if err != nil {
|
||||
gLog.Println(LevelERROR, "cd error:", err)
|
||||
return
|
||||
}
|
||||
|
||||
uninstall()
|
||||
// save config file
|
||||
@@ -132,19 +157,22 @@ func install() {
|
||||
shareBandwidth := installFlag.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private network no limit")
|
||||
logLevel := installFlag.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error")
|
||||
installFlag.Parse(os.Args[2:])
|
||||
if *node != "" && len(*node) < 8 {
|
||||
gLog.Println(LevelERROR, ErrNodeTooShort)
|
||||
os.Exit(9)
|
||||
}
|
||||
if *node == "" { // if node name not set. use os.Hostname
|
||||
hostname := defaultNodeName()
|
||||
node = &hostname
|
||||
}
|
||||
|
||||
gConf.load() // load old config. otherwise will clear all apps
|
||||
gConf.LogLevel = *logLevel
|
||||
gConf.Network.ServerHost = *serverHost
|
||||
gConf.Network.Token = *token
|
||||
gConf.Network.Node = *node
|
||||
if *node != "" {
|
||||
if len(*node) < 8 {
|
||||
gLog.Println(LevelERROR, ErrNodeTooShort)
|
||||
os.Exit(9)
|
||||
}
|
||||
gConf.Network.Node = *node
|
||||
} else {
|
||||
if gConf.Network.Node == "" { // if node name not set. use os.Hostname
|
||||
gConf.Network.Node = defaultNodeName()
|
||||
}
|
||||
}
|
||||
gConf.Network.ServerPort = 27183
|
||||
gConf.Network.UDPPort1 = 27182
|
||||
gConf.Network.UDPPort2 = 27183
|
||||
@@ -159,16 +187,6 @@ func install() {
|
||||
if config.SrcPort != 0 {
|
||||
gConf.add(config, true)
|
||||
}
|
||||
err := os.MkdirAll(defaultInstallPath, 0775)
|
||||
if err != nil {
|
||||
gLog.Printf(LevelERROR, "MkdirAll %s error:%s", defaultInstallPath, err)
|
||||
return
|
||||
}
|
||||
err = os.Chdir(defaultInstallPath)
|
||||
if err != nil {
|
||||
gLog.Println(LevelERROR, "cd error:", err)
|
||||
return
|
||||
}
|
||||
gConf.save()
|
||||
targetPath := filepath.Join(defaultInstallPath, defaultBinName)
|
||||
d := daemon{}
|
||||
|
||||
@@ -110,7 +110,6 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
|
||||
case MsgPushReportApps:
|
||||
gLog.Println(LevelINFO, "MsgPushReportApps")
|
||||
req := ReportApps{}
|
||||
// TODO: add the retrying apps
|
||||
gConf.mtx.Lock()
|
||||
defer gConf.mtx.Unlock()
|
||||
for _, config := range gConf.Apps {
|
||||
@@ -128,6 +127,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
|
||||
}
|
||||
appInfo := AppInfo{
|
||||
AppName: config.AppName,
|
||||
Error: config.errMsg,
|
||||
Protocol: config.Protocol,
|
||||
SrcPort: config.SrcPort,
|
||||
RelayNode: relayNode,
|
||||
@@ -138,7 +138,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
|
||||
PeerUser: config.PeerUser,
|
||||
PeerIP: config.peerIP,
|
||||
PeerNatType: config.peerNatType,
|
||||
RetryTime: config.retryTime.String(),
|
||||
RetryTime: config.retryTime.Local().Format("2006-01-02T15:04:05-0700"),
|
||||
ConnectTime: config.connectTime.Local().Format("2006-01-02T15:04:05-0700"),
|
||||
IsActive: appActive,
|
||||
Enabled: config.Enabled,
|
||||
}
|
||||
@@ -181,10 +182,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
|
||||
gLog.Printf(LevelERROR, "wrong MsgPushEditNode:%s %s", err, string(msg[openP2PHeaderSize:]))
|
||||
return err
|
||||
}
|
||||
gConf.mtx.Lock()
|
||||
gConf.Network.Node = req.NewName
|
||||
gConf.Network.ShareBandwidth = req.Bandwidth
|
||||
gConf.mtx.Unlock()
|
||||
gConf.setNode(req.NewName)
|
||||
gConf.setShareBandwidth(req.Bandwidth)
|
||||
gConf.save()
|
||||
// TODO: hot reload
|
||||
os.Exit(0)
|
||||
|
||||
9
log.go
9
log.go
@@ -97,6 +97,11 @@ func (vl *V8log) setLevel(level LogLevel) {
|
||||
defer vl.mtx.Unlock()
|
||||
vl.level = level
|
||||
}
|
||||
func (vl *V8log) setMode(mode int) {
|
||||
vl.mtx.Lock()
|
||||
defer vl.mtx.Unlock()
|
||||
vl.mode = mode
|
||||
}
|
||||
|
||||
func (vl *V8log) checkFile() {
|
||||
if vl.maxLogSize <= 0 {
|
||||
@@ -110,10 +115,10 @@ func (vl *V8log) checkFile() {
|
||||
for l, logFile := range vl.files {
|
||||
f, e := logFile.Stat()
|
||||
if e != nil {
|
||||
break
|
||||
continue
|
||||
}
|
||||
if f.Size() <= vl.maxLogSize {
|
||||
break
|
||||
continue
|
||||
}
|
||||
logFile.Close()
|
||||
fname := f.Name()
|
||||
|
||||
10
openp2p.go
10
openp2p.go
@@ -13,7 +13,6 @@ func main() {
|
||||
binDir := filepath.Dir(os.Args[0])
|
||||
os.Chdir(binDir) // for system service
|
||||
gLog = InitLogger(binDir, "openp2p", LevelDEBUG, 1024*1024, LogFileAndConsole)
|
||||
|
||||
// TODO: install sub command, deamon process
|
||||
if len(os.Args) > 1 {
|
||||
switch os.Args[1] {
|
||||
@@ -41,9 +40,16 @@ func main() {
|
||||
} else {
|
||||
installByFilename()
|
||||
}
|
||||
parseParams()
|
||||
gLog.Println(LevelINFO, "openp2p start. version: ", OpenP2PVersion)
|
||||
gLog.Println(LevelINFO, "Contact: QQ Group: 16947733, Email: openp2p.cn@gmail.com")
|
||||
parseParams()
|
||||
|
||||
if gConf.daemonMode {
|
||||
d := daemon{}
|
||||
d.run()
|
||||
return
|
||||
}
|
||||
|
||||
gLog.Println(LevelINFO, &gConf)
|
||||
setFirewall()
|
||||
network := P2PNetworkInstance(&gConf.Network)
|
||||
|
||||
150
overlay.go
Normal file
150
overlay.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
var ErrDeadlineExceeded error = &DeadlineExceededError{}
|
||||
|
||||
// DeadlineExceededError is returned for an expired deadline.
|
||||
type DeadlineExceededError struct{}
|
||||
|
||||
// Implement the net.Error interface.
|
||||
// The string is "i/o timeout" because that is what was returned
|
||||
// by earlier Go versions. Changing it may break programs that
|
||||
// match on error strings.
|
||||
func (e *DeadlineExceededError) Error() string { return "i/o timeout" }
|
||||
func (e *DeadlineExceededError) Timeout() bool { return true }
|
||||
func (e *DeadlineExceededError) Temporary() bool { return true }
|
||||
|
||||
// implement io.Writer
|
||||
type overlayConn struct {
|
||||
tunnel *P2PTunnel
|
||||
connTCP net.Conn
|
||||
id uint64
|
||||
rtid uint64
|
||||
running bool
|
||||
isClient bool
|
||||
appID uint64
|
||||
appKey uint64
|
||||
appKeyBytes []byte
|
||||
// for udp
|
||||
connUDP *net.UDPConn
|
||||
remoteAddr net.Addr
|
||||
udpRelayData chan []byte
|
||||
lastReadUDPTs time.Time
|
||||
}
|
||||
|
||||
func (oConn *overlayConn) run() {
|
||||
gLog.Printf(LevelDEBUG, "%d overlayConn run start", oConn.id)
|
||||
defer gLog.Printf(LevelDEBUG, "%d overlayConn run end", oConn.id)
|
||||
oConn.running = true
|
||||
oConn.lastReadUDPTs = time.Now()
|
||||
buffer := make([]byte, ReadBuffLen+PaddingSize)
|
||||
readBuf := buffer[:ReadBuffLen]
|
||||
encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
|
||||
tunnelHead := new(bytes.Buffer)
|
||||
relayHead := new(bytes.Buffer)
|
||||
binary.Write(relayHead, binary.LittleEndian, oConn.rtid)
|
||||
binary.Write(tunnelHead, binary.LittleEndian, oConn.id)
|
||||
for oConn.running && oConn.tunnel.isRuning() {
|
||||
buff, dataLen, err := oConn.Read(readBuf)
|
||||
if err != nil {
|
||||
if ne, ok := err.(net.Error); ok && ne.Timeout() {
|
||||
continue
|
||||
}
|
||||
// overlay tcp connection normal close, debug log
|
||||
gLog.Printf(LevelDEBUG, "overlayConn %d read error:%s,close it", oConn.id, err)
|
||||
break
|
||||
}
|
||||
payload := buff[:dataLen]
|
||||
if oConn.appKey != 0 {
|
||||
payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, buffer[:dataLen], dataLen)
|
||||
}
|
||||
writeBytes := append(tunnelHead.Bytes(), payload...)
|
||||
if oConn.rtid == 0 {
|
||||
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes)
|
||||
gLog.Printf(LevelDEBUG, "write overlay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes))
|
||||
} else {
|
||||
// write raley data
|
||||
all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...)
|
||||
all = append(all, writeBytes...)
|
||||
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all)
|
||||
gLog.Printf(LevelDEBUG, "write relay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes))
|
||||
}
|
||||
}
|
||||
if oConn.connTCP != nil {
|
||||
oConn.connTCP.Close()
|
||||
}
|
||||
if oConn.connUDP != nil {
|
||||
oConn.connUDP.Close()
|
||||
}
|
||||
oConn.tunnel.overlayConns.Delete(oConn.id)
|
||||
// notify peer disconnect
|
||||
if oConn.isClient {
|
||||
req := OverlayDisconnectReq{ID: oConn.id}
|
||||
if oConn.rtid == 0 {
|
||||
oConn.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
|
||||
} else {
|
||||
// write relay data
|
||||
msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
|
||||
msgWithHead := append(relayHead.Bytes(), msg...)
|
||||
oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error) {
|
||||
if oConn.connUDP != nil {
|
||||
if time.Now().After(oConn.lastReadUDPTs.Add(time.Minute * 5)) {
|
||||
err = errors.New("udp close")
|
||||
return
|
||||
}
|
||||
if oConn.remoteAddr != nil { // as server
|
||||
select {
|
||||
case buff = <-oConn.udpRelayData:
|
||||
n = len(buff)
|
||||
oConn.lastReadUDPTs = time.Now()
|
||||
case <-time.After(time.Second * 10):
|
||||
err = ErrDeadlineExceeded
|
||||
}
|
||||
} else { // as client
|
||||
oConn.connUDP.SetReadDeadline(time.Now().Add(5 * time.Second))
|
||||
n, _, err = oConn.connUDP.ReadFrom(reuseBuff)
|
||||
if err == nil {
|
||||
oConn.lastReadUDPTs = time.Now()
|
||||
}
|
||||
buff = reuseBuff
|
||||
}
|
||||
return
|
||||
}
|
||||
oConn.connTCP.SetReadDeadline(time.Now().Add(time.Second * 5))
|
||||
n, err = oConn.connTCP.Read(reuseBuff)
|
||||
buff = reuseBuff
|
||||
return
|
||||
}
|
||||
|
||||
// calling by p2pTunnel
|
||||
func (oConn *overlayConn) Write(buff []byte) (n int, err error) {
|
||||
// add mutex when multi-thread calling
|
||||
if oConn.connUDP != nil {
|
||||
if oConn.remoteAddr == nil {
|
||||
n, err = oConn.connUDP.Write(buff)
|
||||
} else {
|
||||
n, err = oConn.connUDP.WriteTo(buff, oConn.remoteAddr)
|
||||
}
|
||||
if err != nil {
|
||||
oConn.running = false
|
||||
}
|
||||
return
|
||||
}
|
||||
n, err = oConn.connTCP.Write(buff)
|
||||
if err != nil {
|
||||
oConn.running = false
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -1,85 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
// implement io.Writer
|
||||
type overlayTCP struct {
|
||||
tunnel *P2PTunnel
|
||||
conn net.Conn
|
||||
id uint64
|
||||
rtid uint64
|
||||
running bool
|
||||
isClient bool
|
||||
appID uint64
|
||||
appKey uint64
|
||||
appKeyBytes []byte
|
||||
}
|
||||
|
||||
func (otcp *overlayTCP) run() {
|
||||
gLog.Printf(LevelDEBUG, "%d overlayTCP run start", otcp.id)
|
||||
defer gLog.Printf(LevelDEBUG, "%d overlayTCP run end", otcp.id)
|
||||
otcp.running = true
|
||||
buffer := make([]byte, ReadBuffLen+PaddingSize)
|
||||
readBuf := buffer[:ReadBuffLen]
|
||||
encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
|
||||
tunnelHead := new(bytes.Buffer)
|
||||
relayHead := new(bytes.Buffer)
|
||||
binary.Write(relayHead, binary.LittleEndian, otcp.rtid)
|
||||
binary.Write(tunnelHead, binary.LittleEndian, otcp.id)
|
||||
for otcp.running && otcp.tunnel.isRuning() {
|
||||
otcp.conn.SetReadDeadline(time.Now().Add(time.Second * 5))
|
||||
dataLen, err := otcp.conn.Read(readBuf)
|
||||
if err != nil {
|
||||
if ne, ok := err.(net.Error); ok && ne.Timeout() {
|
||||
continue
|
||||
}
|
||||
// overlay tcp connection normal close, debug log
|
||||
gLog.Printf(LevelDEBUG, "overlayTCP %d read error:%s,close it", otcp.id, err)
|
||||
break
|
||||
} else {
|
||||
payload := readBuf[:dataLen]
|
||||
if otcp.appKey != 0 {
|
||||
payload, _ = encryptBytes(otcp.appKeyBytes, encryptData, buffer[:dataLen], dataLen)
|
||||
}
|
||||
writeBytes := append(tunnelHead.Bytes(), payload...)
|
||||
if otcp.rtid == 0 {
|
||||
otcp.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes)
|
||||
} else {
|
||||
// write raley data
|
||||
all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...)
|
||||
all = append(all, writeBytes...)
|
||||
otcp.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all)
|
||||
gLog.Printf(LevelDEBUG, "write relay data to %d:%d bodylen=%d", otcp.rtid, otcp.id, len(writeBytes))
|
||||
}
|
||||
}
|
||||
}
|
||||
otcp.conn.Close()
|
||||
otcp.tunnel.overlayConns.Delete(otcp.id)
|
||||
// notify peer disconnect
|
||||
if otcp.isClient {
|
||||
req := OverlayDisconnectReq{ID: otcp.id}
|
||||
if otcp.rtid == 0 {
|
||||
otcp.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
|
||||
} else {
|
||||
// write relay data
|
||||
msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req)
|
||||
msgWithHead := append(relayHead.Bytes(), msg...)
|
||||
otcp.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// calling by p2pTunnel
|
||||
func (otcp *overlayTCP) Write(buff []byte) (n int, err error) {
|
||||
// add mutex when multi-thread calling
|
||||
n, err = otcp.conn.Write(buff)
|
||||
if err != nil {
|
||||
otcp.tunnel.overlayConns.Delete(otcp.id)
|
||||
}
|
||||
return
|
||||
}
|
||||
172
p2papp.go
172
p2papp.go
@@ -6,23 +6,26 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type p2pApp struct {
|
||||
config AppConfig
|
||||
listener net.Listener
|
||||
tunnel *P2PTunnel
|
||||
rtid uint64
|
||||
relayNode string
|
||||
relayMode string
|
||||
hbTime time.Time
|
||||
hbMtx sync.Mutex
|
||||
running bool
|
||||
id uint64
|
||||
key uint64
|
||||
wg sync.WaitGroup
|
||||
config AppConfig
|
||||
listener net.Listener
|
||||
listenerUDP *net.UDPConn
|
||||
tunnel *P2PTunnel
|
||||
rtid uint64
|
||||
relayNode string
|
||||
relayMode string
|
||||
hbTime time.Time
|
||||
hbMtx sync.Mutex
|
||||
running bool
|
||||
id uint64
|
||||
key uint64
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (app *p2pApp) isActive() bool {
|
||||
@@ -45,43 +48,42 @@ func (app *p2pApp) updateHeartbeat() {
|
||||
}
|
||||
|
||||
func (app *p2pApp) listenTCP() error {
|
||||
gLog.Printf(LevelDEBUG, "tcp accept on port %d start", app.config.SrcPort)
|
||||
defer gLog.Printf(LevelDEBUG, "tcp accept on port %d end", app.config.SrcPort)
|
||||
var err error
|
||||
if app.config.Protocol == "udp" {
|
||||
app.listener, err = net.Listen("udp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort))
|
||||
} else {
|
||||
app.listener, err = net.Listen("tcp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort))
|
||||
}
|
||||
|
||||
app.listener, err = net.Listen("tcp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort))
|
||||
if err != nil {
|
||||
gLog.Printf(LevelERROR, "listen error:%s", err)
|
||||
return err
|
||||
}
|
||||
for {
|
||||
for app.running {
|
||||
conn, err := app.listener.Accept()
|
||||
if err != nil {
|
||||
gLog.Printf(LevelERROR, "%d accept error:%s", app.tunnel.id, err)
|
||||
if app.running {
|
||||
gLog.Printf(LevelERROR, "%d accept error:%s", app.tunnel.id, err)
|
||||
}
|
||||
break
|
||||
}
|
||||
otcp := overlayTCP{
|
||||
oConn := overlayConn{
|
||||
tunnel: app.tunnel,
|
||||
conn: conn,
|
||||
connTCP: conn,
|
||||
id: rand.Uint64(),
|
||||
isClient: true,
|
||||
rtid: app.rtid,
|
||||
appID: app.id,
|
||||
appKey: app.key,
|
||||
}
|
||||
// calc key bytes for encrypt
|
||||
if otcp.appKey != 0 {
|
||||
// pre-calc key bytes for encrypt
|
||||
if oConn.appKey != 0 {
|
||||
encryptKey := make([]byte, AESKeySize)
|
||||
binary.LittleEndian.PutUint64(encryptKey, otcp.appKey)
|
||||
binary.LittleEndian.PutUint64(encryptKey[8:], otcp.appKey)
|
||||
otcp.appKeyBytes = encryptKey
|
||||
binary.LittleEndian.PutUint64(encryptKey, oConn.appKey)
|
||||
binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey)
|
||||
oConn.appKeyBytes = encryptKey
|
||||
}
|
||||
app.tunnel.overlayConns.Store(otcp.id, &otcp)
|
||||
gLog.Printf(LevelDEBUG, "Accept overlayID:%d", otcp.id)
|
||||
app.tunnel.overlayConns.Store(oConn.id, &oConn)
|
||||
gLog.Printf(LevelDEBUG, "Accept TCP overlayID:%d", oConn.id)
|
||||
// tell peer connect
|
||||
req := OverlayConnectReq{ID: otcp.id,
|
||||
req := OverlayConnectReq{ID: oConn.id,
|
||||
Token: app.tunnel.pn.config.Token,
|
||||
DstIP: app.config.DstHost,
|
||||
DstPort: app.config.DstPort,
|
||||
@@ -98,26 +100,117 @@ func (app *p2pApp) listenTCP() error {
|
||||
msgWithHead := append(relayHead.Bytes(), msg...)
|
||||
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
|
||||
}
|
||||
go oConn.run()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
go otcp.run()
|
||||
func (app *p2pApp) listenUDP() error {
|
||||
gLog.Printf(LevelDEBUG, "udp accept on port %d start", app.config.SrcPort)
|
||||
defer gLog.Printf(LevelDEBUG, "udp accept on port %d end", app.config.SrcPort)
|
||||
var err error
|
||||
app.listenerUDP, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: app.config.SrcPort})
|
||||
if err != nil {
|
||||
gLog.Printf(LevelERROR, "listen error:%s", err)
|
||||
return err
|
||||
}
|
||||
buffer := make([]byte, 64*1024)
|
||||
udpID := make([]byte, 8)
|
||||
for {
|
||||
app.listenerUDP.SetReadDeadline(time.Now().Add(time.Second * 10))
|
||||
len, remoteAddr, err := app.listenerUDP.ReadFrom(buffer)
|
||||
if err != nil {
|
||||
if ne, ok := err.(net.Error); ok && ne.Timeout() {
|
||||
continue
|
||||
} else {
|
||||
gLog.Printf(LevelERROR, "udp read failed:%s", err)
|
||||
break
|
||||
}
|
||||
} else {
|
||||
b := bytes.Buffer{}
|
||||
b.Write(buffer[:len])
|
||||
// load from app.tunnel.overlayConns by remoteAddr error, new udp connection
|
||||
remoteIP := strings.Split(remoteAddr.String(), ":")[0]
|
||||
port, _ := strconv.Atoi(strings.Split(remoteAddr.String(), ":")[1])
|
||||
a := net.ParseIP(remoteIP)
|
||||
udpID[0] = a[0]
|
||||
udpID[1] = a[1]
|
||||
udpID[2] = a[2]
|
||||
udpID[3] = a[3]
|
||||
udpID[4] = byte(port)
|
||||
udpID[5] = byte(port >> 8)
|
||||
id := binary.LittleEndian.Uint64(udpID)
|
||||
s, ok := app.tunnel.overlayConns.Load(id)
|
||||
if !ok {
|
||||
oConn := overlayConn{
|
||||
tunnel: app.tunnel,
|
||||
connUDP: app.listenerUDP,
|
||||
remoteAddr: remoteAddr,
|
||||
udpRelayData: make(chan []byte, 1000),
|
||||
id: id,
|
||||
isClient: true,
|
||||
rtid: app.rtid,
|
||||
appID: app.id,
|
||||
appKey: app.key,
|
||||
}
|
||||
// calc key bytes for encrypt
|
||||
if oConn.appKey != 0 {
|
||||
encryptKey := make([]byte, AESKeySize)
|
||||
binary.LittleEndian.PutUint64(encryptKey, oConn.appKey)
|
||||
binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey)
|
||||
oConn.appKeyBytes = encryptKey
|
||||
}
|
||||
app.tunnel.overlayConns.Store(oConn.id, &oConn)
|
||||
gLog.Printf(LevelDEBUG, "Accept UDP overlayID:%d", oConn.id)
|
||||
// tell peer connect
|
||||
req := OverlayConnectReq{ID: oConn.id,
|
||||
Token: app.tunnel.pn.config.Token,
|
||||
DstIP: app.config.DstHost,
|
||||
DstPort: app.config.DstPort,
|
||||
Protocol: app.config.Protocol,
|
||||
AppID: app.id,
|
||||
}
|
||||
if app.rtid == 0 {
|
||||
app.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayConnectReq, &req)
|
||||
} else {
|
||||
req.RelayTunnelID = app.tunnel.id
|
||||
relayHead := new(bytes.Buffer)
|
||||
binary.Write(relayHead, binary.LittleEndian, app.rtid)
|
||||
msg, _ := newMessage(MsgP2P, MsgOverlayConnectReq, &req)
|
||||
msgWithHead := append(relayHead.Bytes(), msg...)
|
||||
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
|
||||
}
|
||||
go oConn.run()
|
||||
oConn.udpRelayData <- b.Bytes()
|
||||
}
|
||||
|
||||
// load from app.tunnel.overlayConns by remoteAddr ok, write relay data
|
||||
overlayConn, ok := s.(*overlayConn)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
overlayConn.udpRelayData <- b.Bytes()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *p2pApp) listen() error {
|
||||
gLog.Printf(LevelINFO, "LISTEN ON PORT %d START", app.config.SrcPort)
|
||||
defer gLog.Printf(LevelINFO, "LISTEN ON PORT %d START", app.config.SrcPort)
|
||||
gLog.Printf(LevelINFO, "LISTEN ON PORT %s:%d START", app.config.Protocol, app.config.SrcPort)
|
||||
defer gLog.Printf(LevelINFO, "LISTEN ON PORT %s:%d END", app.config.Protocol, app.config.SrcPort)
|
||||
app.wg.Add(1)
|
||||
defer app.wg.Done()
|
||||
app.running = true
|
||||
if app.rtid != 0 {
|
||||
go app.relayHeartbeatLoop()
|
||||
}
|
||||
for app.running {
|
||||
|
||||
app.listenTCP()
|
||||
|
||||
time.Sleep(time.Second * 5)
|
||||
for app.tunnel.isRuning() && app.running {
|
||||
if app.config.Protocol == "udp" {
|
||||
app.listenUDP()
|
||||
} else {
|
||||
app.listenTCP()
|
||||
}
|
||||
time.Sleep(time.Second * 10)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -127,6 +220,9 @@ func (app *p2pApp) close() {
|
||||
if app.listener != nil {
|
||||
app.listener.Close()
|
||||
}
|
||||
if app.listenerUDP != nil {
|
||||
app.listenerUDP.Close()
|
||||
}
|
||||
if app.tunnel != nil {
|
||||
app.tunnel.closeOverlayConns(app.id)
|
||||
}
|
||||
|
||||
@@ -94,9 +94,11 @@ func (pn *P2PNetwork) Connect(timeout int) bool {
|
||||
}
|
||||
|
||||
func (pn *P2PNetwork) runAll() {
|
||||
gConf.mtx.Lock()
|
||||
gConf.mtx.Lock() // lock for copy gConf.Apps and the modification of config(it's pointer)
|
||||
defer gConf.mtx.Unlock()
|
||||
for _, config := range gConf.Apps {
|
||||
allApps := gConf.Apps // read a copy, other thread will modify the gConf.Apps
|
||||
|
||||
for _, config := range allApps {
|
||||
if config.nextRetryTime.After(time.Now()) {
|
||||
continue
|
||||
}
|
||||
@@ -107,23 +109,21 @@ func (pn *P2PNetwork) runAll() {
|
||||
config.AppName = fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)
|
||||
}
|
||||
appExist := false
|
||||
appActive := false
|
||||
var appID uint64
|
||||
i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort))
|
||||
if ok {
|
||||
app := i.(*p2pApp)
|
||||
appExist = true
|
||||
appID = app.id
|
||||
if app.isActive() {
|
||||
appActive = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
if appExist && appActive {
|
||||
continue
|
||||
}
|
||||
if appExist && !appActive {
|
||||
if appExist {
|
||||
pn.DeleteApp(*config)
|
||||
}
|
||||
if config.retryNum > 0 {
|
||||
gLog.Printf(LevelINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum)
|
||||
gLog.Printf(LevelINFO, "detect app %s(%d) disconnect, reconnecting the %d times...", config.AppName, appID, config.retryNum)
|
||||
if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // normal lasts 15min
|
||||
config.retryNum = 0
|
||||
}
|
||||
@@ -135,12 +135,17 @@ func (pn *P2PNetwork) runAll() {
|
||||
increase = 900
|
||||
}
|
||||
config.nextRetryTime = time.Now().Add(time.Second * time.Duration(increase)) // exponential increase retry time. 1.3^x
|
||||
go pn.AddApp(*config)
|
||||
config.connectTime = time.Now()
|
||||
gConf.mtx.Unlock() // AddApp will take a period of time
|
||||
err := pn.AddApp(*config)
|
||||
gConf.mtx.Lock()
|
||||
if err != nil {
|
||||
config.errMsg = err.Error()
|
||||
}
|
||||
}
|
||||
}
|
||||
func (pn *P2PNetwork) autorunApp() {
|
||||
gLog.Println(LevelINFO, "autorunApp start")
|
||||
// TODO: use gConf to check reconnect
|
||||
for pn.running {
|
||||
time.Sleep(time.Second)
|
||||
if !pn.online {
|
||||
@@ -205,6 +210,7 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig, appid uint64, appkey uint
|
||||
return t, rspID.ID, rsp.Mode, err
|
||||
}
|
||||
|
||||
// use *AppConfig to save status
|
||||
func (pn *P2PNetwork) AddApp(config AppConfig) error {
|
||||
gLog.Printf(LevelINFO, "addApp %s to %s:%s:%d start", config.AppName, config.PeerNode, config.DstHost, config.DstPort)
|
||||
defer gLog.Printf(LevelINFO, "addApp %s to %s:%s:%d end", config.AppName, config.PeerNode, config.DstHost, config.DstPort)
|
||||
@@ -462,10 +468,8 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
|
||||
pn.serverTs = rsp.Ts
|
||||
pn.config.Token = rsp.Token
|
||||
pn.config.User = rsp.User
|
||||
gConf.mtx.Lock()
|
||||
gConf.Network.Token = rsp.Token
|
||||
gConf.Network.User = rsp.User
|
||||
gConf.mtx.Unlock()
|
||||
gConf.setToken(rsp.Token)
|
||||
gConf.setUser(rsp.User)
|
||||
gConf.save()
|
||||
pn.localTs = time.Now().Unix()
|
||||
gLog.Printf(LevelINFO, "login ok. user=%s,Server ts=%d, local ts=%d", rsp.User, rsp.Ts, pn.localTs)
|
||||
|
||||
58
p2ptunnel.go
58
p2ptunnel.go
@@ -276,7 +276,7 @@ func (t *P2PTunnel) readLoop() {
|
||||
gLog.Printf(LevelDEBUG, "%d tunnel not found overlay connection %d", t.id, overlayID)
|
||||
continue
|
||||
}
|
||||
overlayConn, ok := s.(*overlayTCP)
|
||||
overlayConn, ok := s.(*overlayConn)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
@@ -333,35 +333,34 @@ func (t *P2PTunnel) readLoop() {
|
||||
|
||||
overlayID := req.ID
|
||||
gLog.Printf(LevelDEBUG, "App:%d overlayID:%d connect %+v", req.AppID, overlayID, req)
|
||||
var conn net.Conn
|
||||
if req.Protocol == "udp" {
|
||||
conn, err = net.DialTimeout("udp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5)
|
||||
} else {
|
||||
conn, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5)
|
||||
}
|
||||
if err != nil {
|
||||
gLog.Println(LevelERROR, err)
|
||||
continue
|
||||
}
|
||||
otcp := overlayTCP{
|
||||
oConn := overlayConn{
|
||||
tunnel: t,
|
||||
conn: conn,
|
||||
id: overlayID,
|
||||
isClient: false,
|
||||
rtid: req.RelayTunnelID,
|
||||
appID: req.AppID,
|
||||
appKey: GetKey(req.AppID),
|
||||
}
|
||||
// calc key bytes for encrypt
|
||||
if otcp.appKey != 0 {
|
||||
encryptKey := make([]byte, 16)
|
||||
binary.LittleEndian.PutUint64(encryptKey, otcp.appKey)
|
||||
binary.LittleEndian.PutUint64(encryptKey[8:], otcp.appKey)
|
||||
otcp.appKeyBytes = encryptKey
|
||||
if req.Protocol == "udp" {
|
||||
oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort})
|
||||
} else {
|
||||
oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5)
|
||||
}
|
||||
if err != nil {
|
||||
gLog.Println(LevelERROR, err)
|
||||
continue
|
||||
}
|
||||
|
||||
t.overlayConns.Store(otcp.id, &otcp)
|
||||
go otcp.run()
|
||||
// calc key bytes for encrypt
|
||||
if oConn.appKey != 0 {
|
||||
encryptKey := make([]byte, 16)
|
||||
binary.LittleEndian.PutUint64(encryptKey, oConn.appKey)
|
||||
binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey)
|
||||
oConn.appKeyBytes = encryptKey
|
||||
}
|
||||
|
||||
t.overlayConns.Store(oConn.id, &oConn)
|
||||
go oConn.run()
|
||||
case MsgOverlayDisconnectReq:
|
||||
req := OverlayDisconnectReq{}
|
||||
err := json.Unmarshal(body, &req)
|
||||
@@ -373,8 +372,8 @@ func (t *P2PTunnel) readLoop() {
|
||||
gLog.Printf(LevelDEBUG, "%d disconnect overlay connection %d", t.id, overlayID)
|
||||
i, ok := t.overlayConns.Load(overlayID)
|
||||
if ok {
|
||||
otcp := i.(*overlayTCP)
|
||||
otcp.running = false
|
||||
oConn := i.(*overlayConn)
|
||||
oConn.running = false
|
||||
}
|
||||
default:
|
||||
}
|
||||
@@ -411,9 +410,16 @@ func (t *P2PTunnel) listen() error {
|
||||
|
||||
func (t *P2PTunnel) closeOverlayConns(appID uint64) {
|
||||
t.overlayConns.Range(func(_, i interface{}) bool {
|
||||
otcp := i.(*overlayTCP)
|
||||
if otcp.appID == appID {
|
||||
otcp.conn.Close()
|
||||
oConn := i.(*overlayConn)
|
||||
if oConn.appID == appID {
|
||||
if oConn.connTCP != nil {
|
||||
oConn.connTCP.Close()
|
||||
oConn.connTCP = nil
|
||||
}
|
||||
if oConn.connUDP != nil {
|
||||
oConn.connUDP.Close()
|
||||
oConn.connUDP = nil
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const OpenP2PVersion = "1.2.0"
|
||||
const OpenP2PVersion = "1.4.2"
|
||||
const ProducnName string = "openp2p"
|
||||
|
||||
type openP2PHeader struct {
|
||||
@@ -117,7 +117,7 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
ReadBuffLen = 1024
|
||||
ReadBuffLen = 4096 // for UDP maybe not enough
|
||||
NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow
|
||||
TunnelHeartbeatTime = time.Second * 15
|
||||
TunnelIdleTimeout = time.Minute
|
||||
@@ -134,6 +134,7 @@ const (
|
||||
PublicIPEchoTimeout = time.Second * 3
|
||||
NatTestTimeout = time.Second * 10
|
||||
ClientAPITimeout = time.Second * 10
|
||||
MaxDirectTry = 5
|
||||
)
|
||||
|
||||
// NATNone has public ip
|
||||
@@ -270,7 +271,7 @@ type ReportConnect struct {
|
||||
NatType int `json:"natType,omitempty"`
|
||||
PeerNode string `json:"peerNode,omitempty"`
|
||||
DstPort int `json:"dstPort,omitempty"`
|
||||
DstHost string `json:"dsdtHost,omitempty"`
|
||||
DstHost string `json:"dstHost,omitempty"`
|
||||
PeerUser string `json:"peerUser,omitempty"`
|
||||
PeerNatType int `json:"peerNatType,omitempty"`
|
||||
PeerIP string `json:"peerIP,omitempty"`
|
||||
@@ -298,6 +299,7 @@ type AppInfo struct {
|
||||
RelayMode string `json:"relayMode,omitempty"`
|
||||
Version string `json:"version,omitempty"`
|
||||
RetryTime string `json:"retryTime,omitempty"`
|
||||
ConnectTime string `json:"connectTime,omitempty"`
|
||||
IsActive int `json:"isActive,omitempty"`
|
||||
Enabled int `json:"enabled,omitempty"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user