diff --git a/README-ZH.md b/README-ZH.md index c7acddc..7e9ad28 100644 --- a/README-ZH.md +++ b/README-ZH.md @@ -95,6 +95,7 @@ Windows默认会阻止没有花钱买它家证书签名过的程序,选择“ 服务端有个调度模型,根据带宽、ping值、稳定性、服务时长,尽可能地使共享节点均匀地提供服务。连接共享节点使用TOTP密码,hmac-sha256算法校验,它是一次性密码,和我们平时使用的手机验证码或银行密码器一样的原理。 ## 编译 +go version go1.18.1+ cd到代码根目录,执行 ``` export GOPROXY=https://goproxy.io,direct diff --git a/README.md b/README.md index 134bb3e..4119680 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,7 @@ That's right, the relay node is naturally an man-in-middle, so AES encryption is The server side has a scheduling model, which calculate bandwith, ping value,stability and service duration to provide a well-proportioned service to every share node. It uses TOTP(Time-based One-time Password) with hmac-sha256 algorithem, its theory as same as the cellphone validation code or bank cipher coder. ## Build +go version go1.18.1+ cd root directory of the socure code and execute ``` export GOPROXY=https://goproxy.io,direct diff --git a/common.go b/common.go index fd9d397..8aa228e 100644 --- a/common.go +++ b/common.go @@ -12,6 +12,8 @@ import ( "net/http" "os" "os/exec" + "strconv" + "strings" "time" ) @@ -77,6 +79,7 @@ func pkcs7UnPadding(origData []byte, dataLen int) ([]byte, error) { return origData[:(dataLen - unPadLen)], nil } +// AES-CBC func encryptBytes(key []byte, out, in []byte, plainLen int) ([]byte, error) { if len(key) == 0 { return in[:plainLen], nil @@ -122,20 +125,20 @@ func netInfo() *NetInfo { client := &http.Client{Transport: tr, Timeout: time.Second * 10} r, err := client.Get("https://ifconfig.co/json") if err != nil { - gLog.Println(LevelINFO, "netInfo error:", err) + gLog.Println(LvINFO, "netInfo error:", err) continue } defer r.Body.Close() buf := make([]byte, 1024*64) n, err := r.Body.Read(buf) if err != nil { - gLog.Println(LevelINFO, "netInfo error:", err) + gLog.Println(LvINFO, "netInfo error:", err) continue } rsp := NetInfo{} err = json.Unmarshal(buf[:n], &rsp) if err != nil { - gLog.Printf(LevelERROR, "wrong NetInfo:%s", err) + gLog.Printf(LvERROR, "wrong NetInfo:%s", err) continue } return &rsp @@ -158,3 +161,33 @@ func defaultNodeName() string { } return name } + +const EQUAL int = 0 +const GREATER int = 1 +const LESS int = -1 + +func compareVersion(v1, v2 string) int { + if v1 == v2 { + return EQUAL + } + v1Arr := strings.Split(v1, ".") + v2Arr := strings.Split(v2, ".") + for i, subVer := range v1Arr { + if len(v2Arr) <= i { + return GREATER + } + subv1, _ := strconv.Atoi(subVer) + subv2, _ := strconv.Atoi(v2Arr[i]) + if subv1 > subv2 { + return GREATER + } + if subv1 < subv2 { + return LESS + } + } + return LESS +} + +func IsIPv6(address string) bool { + return strings.Count(address, ":") >= 2 +} diff --git a/common_test.go b/common_test.go index 39f40e3..cd299f1 100644 --- a/common_test.go +++ b/common_test.go @@ -43,3 +43,29 @@ func TestAESCBC(t *testing.T) { func TestNetInfo(t *testing.T) { log.Println(netInfo()) } + +func assertCompareVersion(t *testing.T, v1 string, v2 string, result int) { + if compareVersion(v1, v2) != result { + t.Errorf("compare version %s %s fail\n", v1, v2) + } +} +func TestCompareVersion(t *testing.T) { + // test = + assertCompareVersion(t, "0.98.0", "0.98.0", EQUAL) + assertCompareVersion(t, "0.98", "0.98", EQUAL) + assertCompareVersion(t, "1.4.0", "1.4.0", EQUAL) + assertCompareVersion(t, "1.5.0", "1.5.0", EQUAL) + // test > + assertCompareVersion(t, "0.98.0.22345", "0.98.0.12345", GREATER) + assertCompareVersion(t, "1.98.0.12345", "0.98", GREATER) + assertCompareVersion(t, "10.98.0.12345", "9.98.0.12345", GREATER) + assertCompareVersion(t, "1.4.0", "0.98.0.12345", GREATER) + assertCompareVersion(t, "1.4", "0.98.0.12345", GREATER) + assertCompareVersion(t, "1", "0.98.0.12345", GREATER) + // test < + assertCompareVersion(t, "0.98.0.12345", "0.98.0.12346", LESS) + assertCompareVersion(t, "9.98.0.12345", "10.98.0.12345", LESS) + assertCompareVersion(t, "1.4.2", "1.5.0", LESS) + assertCompareVersion(t, "", "1.5.0", LESS) + +} diff --git a/config.go b/config.go index f87cfc2..8e2a7f7 100644 --- a/config.go +++ b/config.go @@ -4,6 +4,7 @@ import ( "encoding/json" "flag" "io/ioutil" + "os" "sync" "time" ) @@ -23,8 +24,12 @@ type AppConfig struct { PeerUser string Enabled int // default:1 // runtime info + peerVersion string peerToken uint64 peerNatType int + hasIPv4 int + IPv6 string + hasUPNPorNATPMP int peerIP string peerConeNatPort int retryNum int @@ -61,7 +66,7 @@ func (c *Config) add(app AppConfig, override bool) { c.mtx.Lock() defer c.mtx.Unlock() if app.SrcPort == 0 || app.DstPort == 0 { - gLog.Println(LevelERROR, "invalid app ", app) + gLog.Println(LvERROR, "invalid app ", app) return } if override { @@ -95,7 +100,7 @@ func (c *Config) save() { data, _ := json.MarshalIndent(c, "", " ") err := ioutil.WriteFile("config.json", data, 0644) if err != nil { - gLog.Println(LevelERROR, "save config.json error:", err) + gLog.Println(LvERROR, "save config.json error:", err) } } @@ -111,7 +116,7 @@ func (c *Config) load() error { } err = json.Unmarshal(data, &c) if err != nil { - gLog.Println(LevelERROR, "parse config.json error:", err) + gLog.Println(LvERROR, "parse config.json error:", err) } return err } @@ -139,16 +144,19 @@ func (c *Config) setShareBandwidth(bw int) { type NetworkConfig struct { // local info - Token uint64 - Node string - User string - localIP string - ipv6 string - mac string - os string - publicIP string - natType int - ShareBandwidth int + Token uint64 + Node string + User string + localIP string + ipv6 string + mac string + os string + publicIP string + natType int + hasIPv4 int + IPv6 string + hasUPNPorNATPMP int + ShareBandwidth int // server info ServerHost string ServerPort int @@ -156,22 +164,28 @@ type NetworkConfig struct { UDPPort2 int } -func parseParams() { - serverHost := flag.String("serverhost", "api.openp2p.cn", "server host ") +func parseParams(subCommand string) { + fset := flag.NewFlagSet(subCommand, flag.ExitOnError) + serverHost := fset.String("serverhost", "api.openp2p.cn", "server host ") // serverHost := flag.String("serverhost", "127.0.0.1", "server host ") // for debug - node := flag.String("node", "", "node name. 8-31 characters") - token := flag.Uint64("token", 0, "token") - peerNode := flag.String("peernode", "", "peer node name that you want to connect") - dstIP := flag.String("dstip", "127.0.0.1", "destination ip ") - dstPort := flag.Int("dstport", 0, "destination port ") - srcPort := flag.Int("srcport", 0, "source port ") - protocol := flag.String("protocol", "tcp", "tcp or udp") - appName := flag.String("appname", "", "app name") - shareBandwidth := flag.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private network no limit") - daemonMode := flag.Bool("d", false, "daemonMode") - notVerbose := flag.Bool("nv", false, "not log console") - logLevel := flag.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error") - flag.Parse() + token := fset.Uint64("token", 0, "token") + node := fset.String("node", "", "node name. 8-31 characters. if not set, it will be hostname") + peerNode := fset.String("peernode", "", "peer node name that you want to connect") + dstIP := fset.String("dstip", "127.0.0.1", "destination ip ") + dstPort := fset.Int("dstport", 0, "destination port ") + srcPort := fset.Int("srcport", 0, "source port ") + protocol := fset.String("protocol", "tcp", "tcp or udp") + appName := fset.String("appname", "", "app name") + shareBandwidth := fset.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private network no limit") + daemonMode := fset.Bool("d", false, "daemonMode") + notVerbose := fset.Bool("nv", false, "not log console") + newconfig := fset.Bool("newconfig", false, "not load existing config.json") + logLevel := fset.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error") + if subCommand == "" { // no subcommand + fset.Parse(os.Args[1:]) + } else { + fset.Parse(os.Args[2:]) + } config := AppConfig{Enabled: 1} config.PeerNode = *peerNode @@ -180,14 +194,17 @@ func parseParams() { config.SrcPort = *srcPort config.Protocol = *protocol config.AppName = *appName - gConf.load() + if !*newconfig { + gConf.load() // load old config. otherwise will clear all apps + } + gConf.LogLevel = *logLevel if config.SrcPort != 0 { gConf.add(config, true) } // gConf.mtx.Lock() // when calling this func it's single-thread no lock gConf.daemonMode = *daemonMode // spec paramters in commandline will always be used - flag.Visit(func(f *flag.Flag) { + fset.Visit(func(f *flag.Flag) { if f.Name == "sharebandwidth" { gConf.Network.ShareBandwidth = *shareBandwidth } @@ -205,16 +222,20 @@ func parseParams() { if gConf.Network.ServerHost == "" { gConf.Network.ServerHost = *serverHost } - if gConf.Network.Node == "" { - if *node == "" { // config and param's node both empty - hostname := defaultNodeName() - node = &hostname - } - gConf.Network.Node = *node - } if *token != 0 { gConf.Network.Token = *token } + if *node != "" { + if len(*node) < 8 { + gLog.Println(LvERROR, ErrNodeTooShort) + os.Exit(9) + } + gConf.Network.Node = *node + } else { + if gConf.Network.Node == "" { // if node name not set. use os.Hostname + gConf.Network.Node = defaultNodeName() + } + } if gConf.LogLevel == IntValueNotSet { gConf.LogLevel = *logLevel } diff --git a/daemon.go b/daemon.go index 14e106c..f571798 100644 --- a/daemon.go +++ b/daemon.go @@ -1,13 +1,9 @@ package main import ( - "flag" "fmt" - "io" "os" - "os/exec" "path/filepath" - "strings" "time" "github.com/kardianos/service" @@ -19,34 +15,34 @@ type daemon struct { } func (d *daemon) Start(s service.Service) error { - gLog.Println(LevelINFO, "daemon start") + gLog.Println(LvINFO, "daemon start") return nil } func (d *daemon) Stop(s service.Service) error { - gLog.Println(LevelINFO, "service stop") + gLog.Println(LvINFO, "service stop") d.running = false if d.proc != nil { - gLog.Println(LevelINFO, "stop worker") + gLog.Println(LvINFO, "stop worker") d.proc.Kill() } if service.Interactive() { - gLog.Println(LevelINFO, "stop daemon") + gLog.Println(LvINFO, "stop daemon") os.Exit(0) } return nil } func (d *daemon) run() { - gLog.Println(LevelINFO, "daemon run start") - defer gLog.Println(LevelINFO, "daemon run end") + gLog.Println(LvINFO, "daemon run start") + defer gLog.Println(LvINFO, "daemon run end") d.running = true binPath, _ := os.Executable() mydir, err := os.Getwd() if err != nil { fmt.Println(err) } - gLog.Println(LevelINFO, mydir) + gLog.Println(LvINFO, mydir) conf := &service.Config{ Name: ProducnName, DisplayName: ProducnName, @@ -71,14 +67,14 @@ func (d *daemon) run() { dumpFile := filepath.Join("log", "dump.log") f, err := os.Create(filepath.Join(tmpDump)) if err != nil { - gLog.Printf(LevelERROR, "start worker error:%s", err) + gLog.Printf(LvERROR, "start worker error:%s", err) return } - gLog.Println(LevelINFO, "start worker process, args:", args) + gLog.Println(LvINFO, "start worker process, args:", args) execSpec := &os.ProcAttr{Env: append(os.Environ(), "GOTRACEBACK=crash"), Files: []*os.File{os.Stdin, os.Stdout, f}} p, err := os.StartProcess(binPath, args, execSpec) if err != nil { - gLog.Printf(LevelERROR, "start worker error:%s", err) + gLog.Printf(LvERROR, "start worker error:%s", err) return } d.proc = p @@ -87,12 +83,12 @@ func (d *daemon) run() { time.Sleep(time.Second) err = os.Rename(tmpDump, dumpFile) if err != nil { - gLog.Printf(LevelERROR, "rename dump error:%s", err) + gLog.Printf(LvERROR, "rename dump error:%s", err) } if !d.running { return } - gLog.Printf(LevelERROR, "worker stop, restart it after 10s") + gLog.Printf(LvERROR, "worker stop, restart it after 10s") time.Sleep(time.Second * 10) } } @@ -117,163 +113,3 @@ func (d *daemon) Control(ctrlComm string, exeAbsPath string, args []string) erro return nil } - -// examples: -// listen: -// ./openp2p install -node hhd1207-222 -token YOUR-TOKEN -sharebandwidth 0 -// listen and build p2papp: -// ./openp2p install -node hhd1207-222 -token YOUR-TOKEN -sharebandwidth 0 -peernode hhdhome-n1 -dstip 127.0.0.1 -dstport 50022 -protocol tcp -srcport 22 -func install() { - gLog.Println(LevelINFO, "openp2p start. version: ", OpenP2PVersion) - gLog.Println(LevelINFO, "Contact: QQ Group: 16947733, Email: openp2p.cn@gmail.com") - gLog.Println(LevelINFO, "install start") - defer gLog.Println(LevelINFO, "install end") - // auto uninstall - err := os.MkdirAll(defaultInstallPath, 0775) - - if err != nil { - gLog.Printf(LevelERROR, "MkdirAll %s error:%s", defaultInstallPath, err) - return - } - err = os.Chdir(defaultInstallPath) - if err != nil { - gLog.Println(LevelERROR, "cd error:", err) - return - } - - uninstall() - // save config file - installFlag := flag.NewFlagSet("install", flag.ExitOnError) - serverHost := installFlag.String("serverhost", "api.openp2p.cn", "server host ") - // serverHost := flag.String("serverhost", "127.0.0.1", "server host ") // for debug - token := installFlag.Uint64("token", 0, "token") - node := installFlag.String("node", "", "node name. 8-31 characters. if not set, it will be hostname") - peerNode := installFlag.String("peernode", "", "peer node name that you want to connect") - dstIP := installFlag.String("dstip", "127.0.0.1", "destination ip ") - dstPort := installFlag.Int("dstport", 0, "destination port ") - srcPort := installFlag.Int("srcport", 0, "source port ") - protocol := installFlag.String("protocol", "tcp", "tcp or udp") - appName := flag.String("appname", "", "app name") - shareBandwidth := installFlag.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private network no limit") - logLevel := installFlag.Int("loglevel", 1, "0:debug 1:info 2:warn 3:error") - installFlag.Parse(os.Args[2:]) - - gConf.load() // load old config. otherwise will clear all apps - gConf.LogLevel = *logLevel - gConf.Network.ServerHost = *serverHost - gConf.Network.Token = *token - if *node != "" { - if len(*node) < 8 { - gLog.Println(LevelERROR, ErrNodeTooShort) - os.Exit(9) - } - gConf.Network.Node = *node - } else { - if gConf.Network.Node == "" { // if node name not set. use os.Hostname - gConf.Network.Node = defaultNodeName() - } - } - gConf.Network.ServerPort = 27183 - gConf.Network.UDPPort1 = 27182 - gConf.Network.UDPPort2 = 27183 - gConf.Network.ShareBandwidth = *shareBandwidth - config := AppConfig{Enabled: 1} - config.PeerNode = *peerNode - config.DstHost = *dstIP - config.DstPort = *dstPort - config.SrcPort = *srcPort - config.Protocol = *protocol - config.AppName = *appName - if config.SrcPort != 0 { - gConf.add(config, true) - } - gConf.save() - targetPath := filepath.Join(defaultInstallPath, defaultBinName) - d := daemon{} - // copy files - - binPath, _ := os.Executable() - src, errFiles := os.Open(binPath) // can not use args[0], on Windows call openp2p is ok(=openp2p.exe) - if errFiles != nil { - gLog.Printf(LevelERROR, "os.OpenFile %s error:%s", os.Args[0], errFiles) - return - } - - dst, errFiles := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0775) - if errFiles != nil { - gLog.Printf(LevelERROR, "os.OpenFile %s error:%s", targetPath, errFiles) - return - } - - _, errFiles = io.Copy(dst, src) - if errFiles != nil { - gLog.Printf(LevelERROR, "io.Copy error:%s", errFiles) - return - } - src.Close() - dst.Close() - - // install system service - gLog.Println(LevelINFO, "targetPath:", targetPath) - err = d.Control("install", targetPath, []string{"-d"}) - if err == nil { - gLog.Println(LevelINFO, "install system service ok.") - } - time.Sleep(time.Second * 2) - err = d.Control("start", targetPath, []string{"-d"}) - if err != nil { - gLog.Println(LevelERROR, "start openp2p service error:", err) - } else { - gLog.Println(LevelINFO, "start openp2p service ok.") - } -} - -func installByFilename() { - params := strings.Split(filepath.Base(os.Args[0]), "-") - if len(params) < 4 { - return - } - serverHost := params[1] - token := params[2] - gLog.Println(LevelINFO, "install start") - targetPath := os.Args[0] - args := []string{"install"} - args = append(args, "-serverhost") - args = append(args, serverHost) - args = append(args, "-token") - args = append(args, token) - env := os.Environ() - cmd := exec.Command(targetPath, args...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Stdin = os.Stdin - cmd.Env = env - err := cmd.Run() - if err != nil { - gLog.Println(LevelERROR, "install by filename, start process error:", err) - return - } - gLog.Println(LevelINFO, "install end") - fmt.Println("Press the Any Key to exit") - fmt.Scanln() - os.Exit(0) -} -func uninstall() { - gLog.Println(LevelINFO, "uninstall start") - defer gLog.Println(LevelINFO, "uninstall end") - d := daemon{} - err := d.Control("stop", "", nil) - if err != nil { // service maybe not install - return - } - err = d.Control("uninstall", "", nil) - if err != nil { - gLog.Println(LevelERROR, "uninstall system service error:", err) - } else { - gLog.Println(LevelINFO, "uninstall system service ok.") - } - binPath := filepath.Join(defaultInstallPath, defaultBinName) - os.Remove(binPath + "0") - os.Remove(binPath) - // os.RemoveAll(defaultInstallPath) // reserve config.json -} diff --git a/go.mod b/go.mod index 58de87e..4c8db7a 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,27 @@ module openp2p -go 1.16 +go 1.18 require ( github.com/gorilla/websocket v1.4.2 github.com/kardianos/service v1.2.0 - github.com/lucas-clemente/quic-go v0.24.0 + github.com/lucas-clemente/quic-go v0.27.0 golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34 ) + +require ( + github.com/cheekybits/genny v1.0.0 // indirect + github.com/fsnotify/fsnotify v1.4.9 // indirect + github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect + github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect + github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect + github.com/marten-seemann/qtls-go1-18 v0.1.1 // indirect + github.com/nxadm/tail v1.4.8 // indirect + github.com/onsi/ginkgo v1.16.4 // indirect + golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect + golang.org/x/mod v0.4.2 // indirect + golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 // indirect + golang.org/x/tools v0.1.1 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect +) diff --git a/handlepush.go b/handlepush.go index 94de9af..ec45976 100644 --- a/handlepush.go +++ b/handlepush.go @@ -17,37 +17,38 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { if err != nil { return err } - gLog.Printf(LevelDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead) + gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead) switch subType { case MsgPushConnectReq: req := PushConnectReq{} err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req) if err != nil { - gLog.Printf(LevelERROR, "wrong MsgPushConnectReq:%s", err) + gLog.Printf(LvERROR, "wrong MsgPushConnectReq:%s", err) return err } - gLog.Printf(LevelINFO, "%s is connecting...", req.From) - gLog.Println(LevelDEBUG, "push connect response to ", req.From) + gLog.Printf(LvINFO, "%s is connecting...", req.From) + gLog.Println(LvDEBUG, "push connect response to ", req.From) // verify totp token or token if VerifyTOTP(req.Token, pn.config.Token, time.Now().Unix()+(pn.serverTs-pn.localTs)) || // localTs may behind, auto adjust ts VerifyTOTP(req.Token, pn.config.Token, time.Now().Unix()) || (req.FromToken == pn.config.Token) { - gLog.Printf(LevelINFO, "Access Granted\n") + gLog.Printf(LvINFO, "Access Granted\n") config := AppConfig{} config.peerNatType = req.NatType config.peerConeNatPort = req.ConeNatPort config.peerIP = req.FromIP config.PeerNode = req.From + config.peerVersion = req.Version // share relay node will limit bandwidth if req.FromToken != pn.config.Token { - gLog.Printf(LevelINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth) + gLog.Printf(LvINFO, "set share bandwidth %d mbps", pn.config.ShareBandwidth) config.shareBandwidth = pn.config.ShareBandwidth } // go pn.AddTunnel(config, req.ID) go pn.addDirectTunnel(config, req.ID) break } - gLog.Println(LevelERROR, "Access Denied:", req.From) + gLog.Println(LvERROR, "Access Denied:", req.From) rsp := PushConnectRsp{ Error: 1, Detail: fmt.Sprintf("connect to %s error: Access Denied", pn.config.Node), @@ -59,19 +60,19 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { rsp := PushRsp{} err := json.Unmarshal(msg[openP2PHeaderSize:], &rsp) if err != nil { - gLog.Printf(LevelERROR, "wrong pushRsp:%s", err) + gLog.Printf(LvERROR, "wrong pushRsp:%s", err) return err } if rsp.Error == 0 { - gLog.Printf(LevelDEBUG, "push ok, detail:%s", rsp.Detail) + gLog.Printf(LvDEBUG, "push ok, detail:%s", rsp.Detail) } else { - gLog.Printf(LevelERROR, "push error:%d, detail:%s", rsp.Error, rsp.Detail) + gLog.Printf(LvERROR, "push error:%d, detail:%s", rsp.Error, rsp.Detail) } case MsgPushAddRelayTunnelReq: req := AddRelayTunnelReq{} err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req) if err != nil { - gLog.Printf(LevelERROR, "wrong RelayNodeRsp:%s", err) + gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err) return err } config := AppConfig{} @@ -83,12 +84,19 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { // notify peer relay ready msg := TunnelMsg{ID: t.id} pn.push(r.From, MsgPushAddRelayTunnelRsp, msg) - SaveKey(req.AppID, req.AppKey) } }(req) + case MsgPushAPPKey: + req := APPKeySync{} + err := json.Unmarshal(msg[openP2PHeaderSize+PushHeaderSize:], &req) + if err != nil { + gLog.Printf(LvERROR, "wrong APPKeySync:%s", err) + return err + } + SaveKey(req.AppID, req.AppKey) case MsgPushUpdate: - gLog.Println(LevelINFO, "MsgPushUpdate") + gLog.Println(LvINFO, "MsgPushUpdate") update() // download new version first, then exec ./openp2p update targetPath := filepath.Join(defaultInstallPath, defaultBinName) args := []string{"update"} @@ -104,11 +112,11 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { } return err case MsgPushRestart: - gLog.Println(LevelINFO, "MsgPushRestart") + gLog.Println(LvINFO, "MsgPushRestart") os.Exit(0) return err case MsgPushReportApps: - gLog.Println(LevelINFO, "MsgPushReportApps") + gLog.Println(LvINFO, "MsgPushReportApps") req := ReportApps{} gConf.mtx.Lock() defer gConf.mtx.Unlock() @@ -147,11 +155,11 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { } pn.write(MsgReport, MsgReportApps, &req) case MsgPushEditApp: - gLog.Println(LevelINFO, "MsgPushEditApp") + gLog.Println(LvINFO, "MsgPushEditApp") newApp := AppInfo{} err := json.Unmarshal(msg[openP2PHeaderSize:], &newApp) if err != nil { - gLog.Printf(LevelERROR, "wrong MsgPushEditApp:%s %s", err, string(msg[openP2PHeaderSize:])) + gLog.Printf(LvERROR, "wrong MsgPushEditApp:%s %s", err, string(msg[openP2PHeaderSize:])) return err } oldConf := AppConfig{Enabled: 1} @@ -175,11 +183,11 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { // pn.AddApp(config) // TODO: report result case MsgPushEditNode: - gLog.Println(LevelINFO, "MsgPushEditNode") + gLog.Println(LvINFO, "MsgPushEditNode") req := EditNode{} err := json.Unmarshal(msg[openP2PHeaderSize:], &req) if err != nil { - gLog.Printf(LevelERROR, "wrong MsgPushEditNode:%s %s", err, string(msg[openP2PHeaderSize:])) + gLog.Printf(LvERROR, "wrong MsgPushEditNode:%s %s", err, string(msg[openP2PHeaderSize:])) return err } gConf.setNode(req.NewName) @@ -188,15 +196,15 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { // TODO: hot reload os.Exit(0) case MsgPushSwitchApp: - gLog.Println(LevelINFO, "MsgPushSwitchApp") + gLog.Println(LvINFO, "MsgPushSwitchApp") app := AppInfo{} err := json.Unmarshal(msg[openP2PHeaderSize:], &app) if err != nil { - gLog.Printf(LevelERROR, "wrong MsgPushSwitchApp:%s %s", err, string(msg[openP2PHeaderSize:])) + gLog.Printf(LvERROR, "wrong MsgPushSwitchApp:%s %s", err, string(msg[openP2PHeaderSize:])) return err } config := AppConfig{Enabled: app.Enabled, SrcPort: app.SrcPort, Protocol: app.Protocol} - gLog.Println(LevelINFO, app.AppName, " switch to ", app.Enabled) + gLog.Println(LvINFO, app.AppName, " switch to ", app.Enabled) gConf.switchApp(config, app.Enabled) if app.Enabled == 0 { // disable APP diff --git a/holepunch.go b/holepunch.go index b14fbbd..9598c28 100644 --- a/holepunch.go +++ b/holepunch.go @@ -11,8 +11,8 @@ import ( ) func handshakeC2C(t *P2PTunnel) (err error) { - gLog.Printf(LevelDEBUG, "handshakeC2C %s:%d:%d to %s:%d", t.pn.config.Node, t.coneLocalPort, t.coneNatPort, t.config.peerIP, t.config.peerConeNatPort) - defer gLog.Printf(LevelDEBUG, "handshakeC2C ok") + gLog.Printf(LvDEBUG, "handshakeC2C %s:%d:%d to %s:%d", t.pn.config.Node, t.coneLocalPort, t.coneNatPort, t.config.peerIP, t.config.peerConeNatPort) + defer gLog.Printf(LvDEBUG, "handshakeC2C ok") conn, err := net.ListenUDP("udp", t.la) if err != nil { return err @@ -20,40 +20,40 @@ func handshakeC2C(t *P2PTunnel) (err error) { defer conn.Close() _, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) if err != nil { - gLog.Println(LevelDEBUG, "handshakeC2C write MsgPunchHandshake error:", err) + gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshake error:", err) return err } ra, head, _, _, err := UDPRead(conn, 5000) if err != nil { time.Sleep(time.Millisecond * 200) - gLog.Println(LevelDEBUG, err, ", return this error when ip was not reachable, retry read") + gLog.Println(LvDEBUG, err, ", return this error when ip was not reachable, retry read") ra, head, _, _, err = UDPRead(conn, 5000) if err != nil { - gLog.Println(LevelDEBUG, "handshakeC2C read MsgPunchHandshake error:", err) + gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err) return err } } t.ra, _ = net.ResolveUDPAddr("udp", ra.String()) // cone server side if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { - gLog.Printf(LevelDEBUG, "read %d handshake ", t.id) + gLog.Printf(LvDEBUG, "read %d handshake ", t.id) UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) _, head, _, _, err = UDPRead(conn, 5000) if err != nil { - gLog.Println(LevelDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err) + gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err) return err } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { - gLog.Printf(LevelDEBUG, "read %d handshake ack ", t.id) + gLog.Printf(LvDEBUG, "read %d handshake ack ", t.id) return nil } } // cone client side will only read handshake ack if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { - gLog.Printf(LevelDEBUG, "read %d handshake ack ", t.id) + gLog.Printf(LvDEBUG, "read %d handshake ack ", t.id) _, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) if err != nil { - gLog.Println(LevelDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err) + gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err) } return err } @@ -61,8 +61,8 @@ func handshakeC2C(t *P2PTunnel) (err error) { } func handshakeC2S(t *P2PTunnel) error { - gLog.Printf(LevelDEBUG, "handshakeC2S start") - defer gLog.Printf(LevelDEBUG, "handshakeC2S end") + gLog.Printf(LvDEBUG, "handshakeC2S start") + defer gLog.Printf(LvDEBUG, "handshakeC2S end") // even if read timeout, continue handshake t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, SymmetricHandshakeAckTimeout) r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -73,7 +73,7 @@ func handshakeC2S(t *P2PTunnel) error { } defer conn.Close() go func() error { - gLog.Printf(LevelDEBUG, "send symmetric handshake to %s from %d:%d start", t.config.peerIP, t.coneLocalPort, t.coneNatPort) + gLog.Printf(LvDEBUG, "send symmetric handshake to %s from %d:%d start", t.config.peerIP, t.coneLocalPort, t.coneNatPort) for i := 0; i < SymmetricHandshakeNum; i++ { // TODO: auto calc cost time time.Sleep(SymmetricHandshakeInterval) @@ -83,35 +83,35 @@ func handshakeC2S(t *P2PTunnel) error { } _, err = UDPWrite(conn, dst, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) if err != nil { - gLog.Println(LevelDEBUG, "handshakeC2S write MsgPunchHandshake error:", err) + gLog.Println(LvDEBUG, "handshakeC2S write MsgPunchHandshake error:", err) return err } } - gLog.Println(LevelDEBUG, "send symmetric handshake end") + gLog.Println(LvDEBUG, "send symmetric handshake end") return nil }() deadline := time.Now().Add(SymmetricHandshakeAckTimeout) err = conn.SetReadDeadline(deadline) if err != nil { - gLog.Println(LevelERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error") + gLog.Println(LvERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error") return err } // read response of the punching hole ok port result := make([]byte, 1024) _, dst, err := conn.ReadFrom(result) if err != nil { - gLog.Println(LevelERROR, "handshakeC2S wait timeout") + gLog.Println(LvERROR, "handshakeC2S wait timeout") return err } head := &openP2PHeader{} err = binary.Read(bytes.NewReader(result[:openP2PHeaderSize]), binary.LittleEndian, head) if err != nil { - gLog.Println(LevelERROR, "parse p2pheader error:", err) + gLog.Println(LvERROR, "parse p2pheader error:", err) return err } t.ra, _ = net.ResolveUDPAddr("udp", dst.String()) if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { - gLog.Printf(LevelDEBUG, "handshakeC2S read %d handshake ack %s", t.id, dst.String()) + gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ack %s", t.id, dst.String()) _, err = UDPWrite(conn, dst, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) return err } @@ -119,11 +119,11 @@ func handshakeC2S(t *P2PTunnel) error { } func handshakeS2C(t *P2PTunnel) error { - gLog.Printf(LevelDEBUG, "handshakeS2C start") - defer gLog.Printf(LevelDEBUG, "handshakeS2C end") + gLog.Printf(LvDEBUG, "handshakeS2C start") + defer gLog.Printf(LvDEBUG, "handshakeS2C end") gotCh := make(chan *net.UDPAddr, 5) // sequencely udp send handshake, do not parallel send - gLog.Printf(LevelDEBUG, "send symmetric handshake to %s:%d start", t.config.peerIP, t.config.peerConeNatPort) + gLog.Printf(LvDEBUG, "send symmetric handshake to %s:%d start", t.config.peerIP, t.config.peerConeNatPort) gotIt := false gotMtx := sync.Mutex{} for i := 0; i < SymmetricHandshakeNum; i++ { @@ -132,7 +132,7 @@ func handshakeS2C(t *P2PTunnel) error { go func(t *P2PTunnel) error { conn, err := net.ListenUDP("udp", nil) if err != nil { - gLog.Printf(LevelDEBUG, "listen error") + gLog.Printf(LvDEBUG, "listen error") return err } defer conn.Close() @@ -150,15 +150,15 @@ func handshakeS2C(t *P2PTunnel) error { gotIt = true t.la, _ = net.ResolveUDPAddr("udp", conn.LocalAddr().String()) if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { - gLog.Printf(LevelDEBUG, "handshakeS2C read %d handshake ", t.id) + gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ", t.id) UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) _, head, _, _, err = UDPRead(conn, 5000) if err != nil { - gLog.Println(LevelDEBUG, "handshakeS2C handshake error") + gLog.Println(LvDEBUG, "handshakeS2C handshake error") return err } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { - gLog.Printf(LevelDEBUG, "handshakeS2C read %d handshake ack %s", t.id, conn.LocalAddr().String()) + gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ack %s", t.id, conn.LocalAddr().String()) gotCh <- t.la return nil } @@ -166,15 +166,15 @@ func handshakeS2C(t *P2PTunnel) error { return nil }(t) } - gLog.Printf(LevelDEBUG, "send symmetric handshake end") - gLog.Println(LevelDEBUG, "handshakeS2C ready, notify peer connect") + gLog.Printf(LvDEBUG, "send symmetric handshake end") + gLog.Println(LvDEBUG, "handshakeS2C ready, notify peer connect") t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id}) select { case <-time.After(SymmetricHandshakeAckTimeout): return fmt.Errorf("wait handshake failed") case la := <-gotCh: - gLog.Println(LevelDEBUG, "symmetric handshake ok", la) + gLog.Println(LvDEBUG, "symmetric handshake ok", la) } return nil } diff --git a/install.go b/install.go new file mode 100644 index 0000000..7b3e07c --- /dev/null +++ b/install.go @@ -0,0 +1,127 @@ +package main + +import ( + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +// examples: +// listen: +// ./openp2p install -node hhd1207-222 -token YOUR-TOKEN -sharebandwidth 0 +// listen and build p2papp: +// ./openp2p install -node hhd1207-222 -token YOUR-TOKEN -sharebandwidth 0 -peernode hhdhome-n1 -dstip 127.0.0.1 -dstport 50022 -protocol tcp -srcport 22 +func install() { + gLog.Println(LvINFO, "openp2p start. version: ", OpenP2PVersion) + gLog.Println(LvINFO, "Contact: QQ Group: 16947733, Email: openp2p.cn@gmail.com") + gLog.Println(LvINFO, "install start") + defer gLog.Println(LvINFO, "install end") + // auto uninstall + err := os.MkdirAll(defaultInstallPath, 0775) + + if err != nil { + gLog.Printf(LvERROR, "MkdirAll %s error:%s", defaultInstallPath, err) + return + } + err = os.Chdir(defaultInstallPath) + if err != nil { + gLog.Println(LvERROR, "cd error:", err) + return + } + + uninstall() + // save config file + parseParams("install") + targetPath := filepath.Join(defaultInstallPath, defaultBinName) + d := daemon{} + // copy files + + binPath, _ := os.Executable() + src, errFiles := os.Open(binPath) // can not use args[0], on Windows call openp2p is ok(=openp2p.exe) + if errFiles != nil { + gLog.Printf(LvERROR, "os.OpenFile %s error:%s", os.Args[0], errFiles) + return + } + + dst, errFiles := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0775) + if errFiles != nil { + gLog.Printf(LvERROR, "os.OpenFile %s error:%s", targetPath, errFiles) + return + } + + _, errFiles = io.Copy(dst, src) + if errFiles != nil { + gLog.Printf(LvERROR, "io.Copy error:%s", errFiles) + return + } + src.Close() + dst.Close() + + // install system service + gLog.Println(LvINFO, "targetPath:", targetPath) + err = d.Control("install", targetPath, []string{"-d"}) + if err == nil { + gLog.Println(LvINFO, "install system service ok.") + } + time.Sleep(time.Second * 2) + err = d.Control("start", targetPath, []string{"-d"}) + if err != nil { + gLog.Println(LvERROR, "start openp2p service error:", err) + } else { + gLog.Println(LvINFO, "start openp2p service ok.") + } +} + +func installByFilename() { + params := strings.Split(filepath.Base(os.Args[0]), "-") + if len(params) < 4 { + return + } + serverHost := params[1] + token := params[2] + gLog.Println(LvINFO, "install start") + targetPath := os.Args[0] + args := []string{"install"} + args = append(args, "-serverhost") + args = append(args, serverHost) + args = append(args, "-token") + args = append(args, token) + env := os.Environ() + cmd := exec.Command(targetPath, args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Stdin = os.Stdin + cmd.Env = env + err := cmd.Run() + if err != nil { + gLog.Println(LvERROR, "install by filename, start process error:", err) + return + } + gLog.Println(LvINFO, "install end") + fmt.Println("Press the Any Key to exit") + fmt.Scanln() + os.Exit(0) +} +func uninstall() { + gLog.Println(LvINFO, "uninstall start") + defer gLog.Println(LvINFO, "uninstall end") + d := daemon{} + err := d.Control("stop", "", nil) + if err != nil { // service maybe not install + return + } + err = d.Control("uninstall", "", nil) + if err != nil { + gLog.Println(LvERROR, "uninstall system service error:", err) + } else { + gLog.Println(LvINFO, "uninstall system service ok.") + } + binPath := filepath.Join(defaultInstallPath, defaultBinName) + os.Remove(binPath + "0") + os.Remove(binPath) + // os.RemoveAll(defaultInstallPath) // reserve config.json +} diff --git a/log.go b/log.go index 8c28a8a..33b6aa4 100644 --- a/log.go +++ b/log.go @@ -8,17 +8,15 @@ import ( "time" ) -// LogLevel ... type LogLevel int -var gLog *V8log +var gLog *logger -// LevelDEBUG ... const ( - LevelDEBUG LogLevel = iota - LevelINFO - LevelWARN - LevelERROR + LvDEBUG LogLevel = iota + LvINFO + LvWARN + LvERROR ) var ( @@ -30,10 +28,10 @@ func init() { logFileNames = make(map[LogLevel]string) loglevel = make(map[LogLevel]string) logFileNames[0] = ".log" - loglevel[LevelDEBUG] = "DEBUG" - loglevel[LevelINFO] = "INFO" - loglevel[LevelWARN] = "WARN" - loglevel[LevelERROR] = "ERROR" + loglevel[LvDEBUG] = "DEBUG" + loglevel[LvINFO] = "INFO" + loglevel[LvWARN] = "WARN" + loglevel[LvERROR] = "ERROR" } @@ -43,25 +41,21 @@ const ( LogFileAndConsole ) -// V8log ... -type V8log struct { +type logger struct { loggers map[LogLevel]*log.Logger files map[LogLevel]*os.File level LogLevel - stopSig chan bool logDir string mtx *sync.Mutex - stoped bool lineEnding string pid int maxLogSize int64 mode int } -// InitLogger ... -func InitLogger(path string, filePrefix string, level LogLevel, maxLogSize int64, mode int) *V8log { - logger := make(map[LogLevel]*log.Logger) - openedfile := make(map[LogLevel]*os.File) +func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64, mode int) *logger { + loggers := make(map[LogLevel]*log.Logger) + logfiles := make(map[LogLevel]*os.File) var ( logdir string ) @@ -71,15 +65,15 @@ func InitLogger(path string, filePrefix string, level LogLevel, maxLogSize int64 logdir = path + "/log/" } os.MkdirAll(logdir, 0777) - for l := range logFileNames { - logFilePath := logdir + filePrefix + logFileNames[l] + for lv := range logFileNames { + logFilePath := logdir + filePrefix + logFileNames[lv] f, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Fatal(err) } os.Chmod(logFilePath, 0666) - openedfile[l] = f - logger[l] = log.New(f, "", log.LstdFlags) + logfiles[lv] = f + loggers[lv] = log.New(f, "", log.LstdFlags) } var le string if runtime.GOOS == "windows" { @@ -87,97 +81,84 @@ func InitLogger(path string, filePrefix string, level LogLevel, maxLogSize int64 } else { le = "\n" } - pLog := &V8log{logger, openedfile, level, make(chan bool, 10), logdir, &sync.Mutex{}, false, le, os.Getpid(), maxLogSize, mode} + pLog := &logger{loggers, logfiles, level, logdir, &sync.Mutex{}, le, os.Getpid(), maxLogSize, mode} go pLog.checkFile() return pLog } -func (vl *V8log) setLevel(level LogLevel) { - vl.mtx.Lock() - defer vl.mtx.Unlock() - vl.level = level +func (l *logger) setLevel(level LogLevel) { + l.mtx.Lock() + defer l.mtx.Unlock() + l.level = level } -func (vl *V8log) setMode(mode int) { - vl.mtx.Lock() - defer vl.mtx.Unlock() - vl.mode = mode +func (l *logger) setMode(mode int) { + l.mtx.Lock() + defer l.mtx.Unlock() + l.mode = mode } -func (vl *V8log) checkFile() { - if vl.maxLogSize <= 0 { +func (l *logger) checkFile() { + if l.maxLogSize <= 0 { return } ticker := time.NewTicker(time.Minute) for { select { case <-ticker.C: - vl.mtx.Lock() - for l, logFile := range vl.files { + l.mtx.Lock() + for lv, logFile := range l.files { f, e := logFile.Stat() if e != nil { continue } - if f.Size() <= vl.maxLogSize { + if f.Size() <= l.maxLogSize { continue } logFile.Close() fname := f.Name() - backupPath := vl.logDir + fname + ".0" + backupPath := l.logDir + fname + ".0" os.Remove(backupPath) - os.Rename(vl.logDir+fname, backupPath) - newFile, e := os.OpenFile(vl.logDir+fname, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + os.Rename(l.logDir+fname, backupPath) + newFile, e := os.OpenFile(l.logDir+fname, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if e == nil { - vl.loggers[l].SetOutput(newFile) - vl.files[l] = newFile + l.loggers[lv].SetOutput(newFile) + l.files[lv] = newFile } - } - vl.mtx.Unlock() - case <-vl.stopSig: - } - if vl.stoped { - break + l.mtx.Unlock() } } } -// Printf Warning: report error log depends on this Print format. -func (vl *V8log) Printf(level LogLevel, format string, params ...interface{}) { - vl.mtx.Lock() - defer vl.mtx.Unlock() - if vl.stoped { +func (l *logger) Printf(level LogLevel, format string, params ...interface{}) { + l.mtx.Lock() + defer l.mtx.Unlock() + if level < l.level { return } - if level < vl.level { - return - } - pidAndLevel := []interface{}{vl.pid, loglevel[level]} + pidAndLevel := []interface{}{l.pid, loglevel[level]} params = append(pidAndLevel, params...) - if vl.mode == LogFile || vl.mode == LogFileAndConsole { - vl.loggers[0].Printf("%d %s "+format+vl.lineEnding, params...) + if l.mode == LogFile || l.mode == LogFileAndConsole { + l.loggers[0].Printf("%d %s "+format+l.lineEnding, params...) } - if vl.mode == LogConsole || vl.mode == LogFileAndConsole { - log.Printf("%d %s "+format+vl.lineEnding, params...) + if l.mode == LogConsole || l.mode == LogFileAndConsole { + log.Printf("%d %s "+format+l.lineEnding, params...) } } -// Println ... -func (vl *V8log) Println(level LogLevel, params ...interface{}) { - vl.mtx.Lock() - defer vl.mtx.Unlock() - if vl.stoped { +func (l *logger) Println(level LogLevel, params ...interface{}) { + l.mtx.Lock() + defer l.mtx.Unlock() + if level < l.level { return } - if level < vl.level { - return - } - pidAndLevel := []interface{}{vl.pid, " ", loglevel[level], " "} + pidAndLevel := []interface{}{l.pid, " ", loglevel[level], " "} params = append(pidAndLevel, params...) - params = append(params, vl.lineEnding) - if vl.mode == LogFile || vl.mode == LogFileAndConsole { - vl.loggers[0].Print(params...) + params = append(params, l.lineEnding) + if l.mode == LogFile || l.mode == LogFileAndConsole { + l.loggers[0].Print(params...) } - if vl.mode == LogConsole || vl.mode == LogFileAndConsole { + if l.mode == LogConsole || l.mode == LogFileAndConsole { log.Print(params...) } } diff --git a/nat.go b/nat.go index fc0e231..092769e 100644 --- a/nat.go +++ b/nat.go @@ -3,47 +3,69 @@ package main import ( "encoding/json" "fmt" + "log" "math/rand" "net" "time" ) -func natTest(serverHost string, serverPort int, localPort int, echoPort int) (publicIP string, isPublicIP int, publicPort int, err error) { +func natTest(serverHost string, serverPort int, localPort int, echoPort int) (publicIP string, hasPublicIP int, hasUPNPorNATPMP int, publicPort int, err error) { conn, err := net.ListenPacket("udp", fmt.Sprintf(":%d", localPort)) if err != nil { - return "", 0, 0, err + return "", 0, 0, 0, err } defer conn.Close() dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", serverHost, serverPort)) if err != nil { - return "", 0, 0, err + return "", 0, 0, 0, err } // The connection can write data to the desired address. msg, err := newMessage(MsgNATDetect, 0, &NatDetectReq{SrcPort: localPort, EchoPort: echoPort}) _, err = conn.WriteTo(msg, dst) if err != nil { - return "", 0, 0, err + return "", 0, 0, 0, err } deadline := time.Now().Add(NatTestTimeout) err = conn.SetReadDeadline(deadline) if err != nil { - return "", 0, 0, err + return "", 0, 0, 0, err } buffer := make([]byte, 1024) nRead, _, err := conn.ReadFrom(buffer) if err != nil { - gLog.Println(LevelERROR, "NAT detect error:", err) - return "", 0, 0, err + gLog.Println(LvERROR, "NAT detect error:", err) + return "", 0, 0, 0, err } natRsp := NatDetectRsp{} err = json.Unmarshal(buffer[openP2PHeaderSize:nRead], &natRsp) - + hasPublicIP = 0 + hasUPNPorNATPMP = 0 // testing for public ip if echoPort != 0 { - for { - gLog.Printf(LevelDEBUG, "public ip test start %s:%d", natRsp.IP, echoPort) + for i := 0; i < 2; i++ { + if i == 1 { + // test upnp or nat-pmp + nat, err := Discover() + if err != nil { + gLog.Println(LvDEBUG, "could not perform UPNP discover:", err) + break + } + ext, err := nat.GetExternalAddress() + if err != nil { + gLog.Println(LvDEBUG, "could not perform UPNP external address:", err) + break + } + log.Println("PublicIP:", ext) + + externalPort, err := nat.AddPortMapping("udp", echoPort, echoPort, "openp2p", 60) + if err != nil { + gLog.Println(LvDEBUG, "could not add udp UPNP port mapping", externalPort) + break + } + } + gLog.Printf(LvDEBUG, "public ip test start %s:%d", natRsp.IP, echoPort) conn, err := net.ListenUDP("udp", nil) if err != nil { break @@ -60,56 +82,59 @@ func natTest(serverHost string, serverPort int, localPort int, echoPort int) (pu conn.SetReadDeadline(time.Now().Add(PublicIPEchoTimeout)) _, _, err = conn.ReadFromUDP(buf) if err == nil { - gLog.Println(LevelDEBUG, "public ip:YES") - natRsp.IsPublicIP = 1 - } else { - gLog.Println(LevelDEBUG, "public ip:NO") + if i == 1 { + gLog.Println(LvDEBUG, "UPNP or NAT-PMP:YES") + hasUPNPorNATPMP = 1 + } else { + gLog.Println(LvDEBUG, "public ip:YES") + hasPublicIP = 1 + } + break } - break } } - return natRsp.IP, natRsp.IsPublicIP, natRsp.Port, nil + return natRsp.IP, hasPublicIP, hasUPNPorNATPMP, natRsp.Port, nil } -func getNATType(host string, udp1 int, udp2 int) (publicIP string, NATType int, err error) { +func getNATType(host string, udp1 int, udp2 int) (publicIP string, NATType int, hasUPNPorNATPMP int, err error) { // the random local port may be used by other. localPort := int(rand.Uint32()%10000 + 50000) echoPort := int(rand.Uint32()%10000 + 50000) go echo(echoPort) - ip1, isPublicIP, port1, err := natTest(host, udp1, localPort, echoPort) - gLog.Printf(LevelDEBUG, "local port:%d nat port:%d", localPort, port1) + ip1, hasPublicIP, hasUPNPorNATPMP, port1, err := natTest(host, udp1, localPort, echoPort) + gLog.Printf(LvDEBUG, "local port:%d nat port:%d", localPort, port1) if err != nil { - return "", 0, err + return "", 0, hasUPNPorNATPMP, err } - if isPublicIP == 1 { - return ip1, NATNone, nil + if hasPublicIP == 1 || hasUPNPorNATPMP == 1 { + return ip1, NATNone, hasUPNPorNATPMP, nil } - ip2, _, port2, err := natTest(host, udp2, localPort, 0) // 2rd nat test not need testing publicip - gLog.Printf(LevelDEBUG, "local port:%d nat port:%d", localPort, port2) + ip2, _, _, port2, err := natTest(host, udp2, localPort, 0) // 2rd nat test not need testing publicip + gLog.Printf(LvDEBUG, "local port:%d nat port:%d", localPort, port2) if err != nil { - return "", 0, err + return "", 0, hasUPNPorNATPMP, err } if ip1 != ip2 { - return "", 0, fmt.Errorf("ip have changed, please retry again") + return "", 0, hasUPNPorNATPMP, fmt.Errorf("ip have changed, please retry again") } natType := NATSymmetric if port1 == port2 { natType = NATCone } //TODO: NATNone - return ip1, natType, nil + return ip1, natType, hasUPNPorNATPMP, nil } func echo(echoPort int) { conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: echoPort}) if err != nil { - gLog.Println(LevelERROR, "echo server listen error:", err) + gLog.Println(LvERROR, "echo server listen error:", err) return } buf := make([]byte, 1600) defer conn.Close() // wait 5s for echo testing - conn.SetReadDeadline(time.Now().Add(time.Second * 5)) + conn.SetReadDeadline(time.Now().Add(time.Second * 30)) n, addr, err := conn.ReadFromUDP(buf) if err != nil { return diff --git a/openp2p.go b/openp2p.go index 2002c62..2b7074e 100644 --- a/openp2p.go +++ b/openp2p.go @@ -12,7 +12,7 @@ func main() { rand.Seed(time.Now().UnixNano()) binDir := filepath.Dir(os.Args[0]) os.Chdir(binDir) // for system service - gLog = InitLogger(binDir, "openp2p", LevelDEBUG, 1024*1024, LogFileAndConsole) + gLog = NewLogger(binDir, "openp2p", LvDEBUG, 1024*1024, LogFileAndConsole) // TODO: install sub command, deamon process if len(os.Args) > 1 { switch os.Args[1] { @@ -20,14 +20,14 @@ func main() { fmt.Println(OpenP2PVersion) return case "update": - gLog = InitLogger(filepath.Dir(os.Args[0]), "openp2p", LevelDEBUG, 1024*1024, LogFileAndConsole) + gLog = NewLogger(filepath.Dir(os.Args[0]), "openp2p", LvDEBUG, 1024*1024, LogFileAndConsole) targetPath := filepath.Join(defaultInstallPath, defaultBinName) d := daemon{} err := d.Control("restart", targetPath, nil) if err != nil { - gLog.Println(LevelERROR, "restart service error:", err) + gLog.Println(LvERROR, "restart service error:", err) } else { - gLog.Println(LevelINFO, "restart service ok.") + gLog.Println(LvINFO, "restart service ok.") } return case "install": @@ -40,9 +40,9 @@ func main() { } else { installByFilename() } - parseParams() - gLog.Println(LevelINFO, "openp2p start. version: ", OpenP2PVersion) - gLog.Println(LevelINFO, "Contact: QQ Group: 16947733, Email: openp2p.cn@gmail.com") + parseParams("") + gLog.Println(LvINFO, "openp2p start. version: ", OpenP2PVersion) + gLog.Println(LvINFO, "Contact: QQ Group: 16947733, Email: openp2p.cn@gmail.com") if gConf.daemonMode { d := daemon{} @@ -50,14 +50,14 @@ func main() { return } - gLog.Println(LevelINFO, &gConf) + gLog.Println(LvINFO, &gConf) setFirewall() network := P2PNetworkInstance(&gConf.Network) if ok := network.Connect(30000); !ok { - gLog.Println(LevelERROR, "P2PNetwork login error") + gLog.Println(LvERROR, "P2PNetwork login error") return } - gLog.Println(LevelINFO, "waiting for connection...") + gLog.Println(LvINFO, "waiting for connection...") forever := make(chan bool) <-forever } diff --git a/overlay.go b/overlay.go index f805f41..f421915 100644 --- a/overlay.go +++ b/overlay.go @@ -40,8 +40,8 @@ type overlayConn struct { } func (oConn *overlayConn) run() { - gLog.Printf(LevelDEBUG, "%d overlayConn run start", oConn.id) - defer gLog.Printf(LevelDEBUG, "%d overlayConn run end", oConn.id) + gLog.Printf(LvDEBUG, "%d overlayConn run start", oConn.id) + defer gLog.Printf(LvDEBUG, "%d overlayConn run end", oConn.id) oConn.running = true oConn.lastReadUDPTs = time.Now() buffer := make([]byte, ReadBuffLen+PaddingSize) @@ -58,7 +58,7 @@ func (oConn *overlayConn) run() { continue } // overlay tcp connection normal close, debug log - gLog.Printf(LevelDEBUG, "overlayConn %d read error:%s,close it", oConn.id, err) + gLog.Printf(LvDEBUG, "overlayConn %d read error:%s,close it", oConn.id, err) break } payload := buff[:dataLen] @@ -68,13 +68,13 @@ func (oConn *overlayConn) run() { writeBytes := append(tunnelHead.Bytes(), payload...) if oConn.rtid == 0 { oConn.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes) - gLog.Printf(LevelDEBUG, "write overlay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes)) + gLog.Printf(LvDEBUG, "write overlay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes)) } else { // write raley data all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...) all = append(all, writeBytes...) oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all) - gLog.Printf(LevelDEBUG, "write relay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes)) + gLog.Printf(LvDEBUG, "write relay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes)) } } if oConn.connTCP != nil { diff --git a/p2papp.go b/p2papp.go index da63519..c4b6da1 100644 --- a/p2papp.go +++ b/p2papp.go @@ -17,7 +17,7 @@ type p2pApp struct { listener net.Listener listenerUDP *net.UDPConn tunnel *P2PTunnel - rtid uint64 + rtid uint64 // relay tunnelID relayNode string relayMode string hbTime time.Time @@ -48,19 +48,19 @@ func (app *p2pApp) updateHeartbeat() { } func (app *p2pApp) listenTCP() error { - gLog.Printf(LevelDEBUG, "tcp accept on port %d start", app.config.SrcPort) - defer gLog.Printf(LevelDEBUG, "tcp accept on port %d end", app.config.SrcPort) + gLog.Printf(LvDEBUG, "tcp accept on port %d start", app.config.SrcPort) + defer gLog.Printf(LvDEBUG, "tcp accept on port %d end", app.config.SrcPort) var err error app.listener, err = net.Listen("tcp4", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort)) if err != nil { - gLog.Printf(LevelERROR, "listen error:%s", err) + gLog.Printf(LvERROR, "listen error:%s", err) return err } for app.running { conn, err := app.listener.Accept() if err != nil { if app.running { - gLog.Printf(LevelERROR, "%d accept error:%s", app.tunnel.id, err) + gLog.Printf(LvERROR, "%d accept error:%s", app.id, err) } break } @@ -81,7 +81,7 @@ func (app *p2pApp) listenTCP() error { oConn.appKeyBytes = encryptKey } app.tunnel.overlayConns.Store(oConn.id, &oConn) - gLog.Printf(LevelDEBUG, "Accept TCP overlayID:%d", oConn.id) + gLog.Printf(LvDEBUG, "Accept TCP overlayID:%d", oConn.id) // tell peer connect req := OverlayConnectReq{ID: oConn.id, Token: app.tunnel.pn.config.Token, @@ -106,12 +106,12 @@ func (app *p2pApp) listenTCP() error { } func (app *p2pApp) listenUDP() error { - gLog.Printf(LevelDEBUG, "udp accept on port %d start", app.config.SrcPort) - defer gLog.Printf(LevelDEBUG, "udp accept on port %d end", app.config.SrcPort) + gLog.Printf(LvDEBUG, "udp accept on port %d start", app.config.SrcPort) + defer gLog.Printf(LvDEBUG, "udp accept on port %d end", app.config.SrcPort) var err error app.listenerUDP, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: app.config.SrcPort}) if err != nil { - gLog.Printf(LevelERROR, "listen error:%s", err) + gLog.Printf(LvERROR, "listen error:%s", err) return err } buffer := make([]byte, 64*1024) @@ -123,7 +123,7 @@ func (app *p2pApp) listenUDP() error { if ne, ok := err.(net.Error); ok && ne.Timeout() { continue } else { - gLog.Printf(LevelERROR, "udp read failed:%s", err) + gLog.Printf(LvERROR, "udp read failed:%s", err) break } } else { @@ -161,7 +161,7 @@ func (app *p2pApp) listenUDP() error { oConn.appKeyBytes = encryptKey } app.tunnel.overlayConns.Store(oConn.id, &oConn) - gLog.Printf(LevelDEBUG, "Accept UDP overlayID:%d", oConn.id) + gLog.Printf(LvDEBUG, "Accept UDP overlayID:%d", oConn.id) // tell peer connect req := OverlayConnectReq{ID: oConn.id, Token: app.tunnel.pn.config.Token, @@ -196,8 +196,8 @@ func (app *p2pApp) listenUDP() error { } func (app *p2pApp) listen() error { - gLog.Printf(LevelINFO, "LISTEN ON PORT %s:%d START", app.config.Protocol, app.config.SrcPort) - defer gLog.Printf(LevelINFO, "LISTEN ON PORT %s:%d END", app.config.Protocol, app.config.SrcPort) + gLog.Printf(LvINFO, "LISTEN ON PORT %s:%d START", app.config.Protocol, app.config.SrcPort) + defer gLog.Printf(LvINFO, "LISTEN ON PORT %s:%d END", app.config.Protocol, app.config.SrcPort) app.wg.Add(1) defer app.wg.Done() app.running = true @@ -233,8 +233,8 @@ func (app *p2pApp) close() { func (app *p2pApp) relayHeartbeatLoop() { app.wg.Add(1) defer app.wg.Done() - gLog.Printf(LevelDEBUG, "relayHeartbeat to %d start", app.rtid) - defer gLog.Printf(LevelDEBUG, "relayHeartbeat to %d end", app.rtid) + gLog.Printf(LvDEBUG, "relayHeartbeat to %d start", app.rtid) + defer gLog.Printf(LvDEBUG, "relayHeartbeat to %d end", app.rtid) relayHead := new(bytes.Buffer) binary.Write(relayHead, binary.LittleEndian, app.rtid) req := RelayHeartbeat{RelayTunnelID: app.tunnel.id, diff --git a/p2pnetwork.go b/p2pnetwork.go index d8398a9..9f3934d 100644 --- a/p2pnetwork.go +++ b/p2pnetwork.go @@ -9,7 +9,6 @@ import ( "fmt" "math" "math/rand" - "net" "net/url" "strings" "sync" @@ -76,7 +75,7 @@ func (pn *P2PNetwork) run() { time.Sleep(NetworkHeartbeatTime) err := pn.init() if err != nil { - gLog.Println(LevelERROR, "P2PNetwork init error:", err) + gLog.Println(LvERROR, "P2PNetwork init error:", err) } } } @@ -123,7 +122,7 @@ func (pn *P2PNetwork) runAll() { pn.DeleteApp(*config) } if config.retryNum > 0 { - gLog.Printf(LevelINFO, "detect app %s(%d) disconnect, reconnecting the %d times...", config.AppName, appID, config.retryNum) + gLog.Printf(LvINFO, "detect app %s(%d) disconnect, reconnecting the %d times...", config.AppName, appID, config.retryNum) if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // normal lasts 15min config.retryNum = 0 } @@ -145,7 +144,7 @@ func (pn *P2PNetwork) runAll() { } } func (pn *P2PNetwork) autorunApp() { - gLog.Println(LevelINFO, "autorunApp start") + gLog.Println(LvINFO, "autorunApp start") for pn.running { time.Sleep(time.Second) if !pn.online { @@ -154,12 +153,12 @@ func (pn *P2PNetwork) autorunApp() { pn.runAll() time.Sleep(time.Second * 10) } - gLog.Println(LevelINFO, "autorunApp end") + gLog.Println(LvINFO, "autorunApp end") } -func (pn *P2PNetwork) addRelayTunnel(config AppConfig, appid uint64, appkey uint64) (*P2PTunnel, uint64, string, error) { - gLog.Printf(LevelINFO, "addRelayTunnel to %s start", config.PeerNode) - defer gLog.Printf(LevelINFO, "addRelayTunnel to %s end", config.PeerNode) +func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, string, error) { + gLog.Printf(LvINFO, "addRelayTunnel to %s start", config.PeerNode) + defer gLog.Printf(LvINFO, "addRelayTunnel to %s end", config.PeerNode) pn.write(MsgRelay, MsgRelayNodeReq, &RelayNodeReq{config.PeerNode}) head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, time.Second*10) if head == nil { @@ -168,20 +167,20 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig, appid uint64, appkey uint rsp := RelayNodeRsp{} err := json.Unmarshal(body, &rsp) if err != nil { - gLog.Printf(LevelERROR, "wrong RelayNodeRsp:%s", err) + gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err) return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error") } if rsp.RelayName == "" || rsp.RelayToken == 0 { - gLog.Printf(LevelERROR, "MsgRelayNodeReq error") + gLog.Printf(LvERROR, "MsgRelayNodeReq error") return nil, 0, "", errors.New("MsgRelayNodeReq error") } - gLog.Printf(LevelINFO, "got relay node:%s", rsp.RelayName) + gLog.Printf(LvINFO, "got relay node:%s", rsp.RelayName) relayConfig := config relayConfig.PeerNode = rsp.RelayName relayConfig.peerToken = rsp.RelayToken t, err := pn.addDirectTunnel(relayConfig, 0) if err != nil { - gLog.Println(LevelERROR, "direct connect error:", err) + gLog.Println(LvERROR, "direct connect error:", err) return nil, 0, "", err } // notify peer addRelayTunnel @@ -189,22 +188,20 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig, appid uint64, appkey uint From: pn.config.Node, RelayName: rsp.RelayName, RelayToken: rsp.RelayToken, - AppID: appid, - AppKey: appkey, } - gLog.Printf(LevelINFO, "push relay %s---------%s", config.PeerNode, rsp.RelayName) + gLog.Printf(LvINFO, "push relay %s---------%s", config.PeerNode, rsp.RelayName) pn.push(config.PeerNode, MsgPushAddRelayTunnelReq, &req) // wait relay ready head, body = pn.read(config.PeerNode, MsgPush, MsgPushAddRelayTunnelRsp, PeerAddRelayTimeount) // TODO: const value if head == nil { - gLog.Printf(LevelERROR, "read MsgPushAddRelayTunnelRsp error") + gLog.Printf(LvERROR, "read MsgPushAddRelayTunnelRsp error") return nil, 0, "", errors.New("read MsgPushAddRelayTunnelRsp error") } rspID := TunnelMsg{} err = json.Unmarshal(body, &rspID) if err != nil { - gLog.Printf(LevelERROR, "wrong RelayNodeRsp:%s", err) + gLog.Printf(LvERROR, "wrong RelayNodeRsp:%s", err) return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error") } return t, rspID.ID, rsp.Mode, err @@ -212,8 +209,8 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig, appid uint64, appkey uint // use *AppConfig to save status func (pn *P2PNetwork) AddApp(config AppConfig) error { - gLog.Printf(LevelINFO, "addApp %s to %s:%s:%d start", config.AppName, config.PeerNode, config.DstHost, config.DstPort) - defer gLog.Printf(LevelINFO, "addApp %s to %s:%s:%d end", config.AppName, config.PeerNode, config.DstHost, config.DstPort) + gLog.Printf(LvINFO, "addApp %s to %s:%s:%d start", config.AppName, config.PeerNode, config.DstHost, config.DstPort) + defer gLog.Printf(LvINFO, "addApp %s to %s:%s:%d end", config.AppName, config.PeerNode, config.DstHost, config.DstPort) if !pn.online { return errors.New("P2PNetwork offline") } @@ -240,9 +237,8 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error { peerIP = t.config.peerIP } if err != nil && err == ErrorHandshake { - gLog.Println(LevelERROR, "direct connect failed, try to relay") - appKey = rand.Uint64() - t, rtid, relayMode, err = pn.addRelayTunnel(config, appID, appKey) + gLog.Println(LvERROR, "direct connect failed, try to relay") + t, rtid, relayMode, err = pn.addRelayTunnel(config) if t != nil { relayNode = t.config.PeerNode } @@ -269,6 +265,16 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error { if err != nil { return err } + if rtid != 0 || t.conn.Protocol() == "tcp" { + // sync appkey + appKey = rand.Uint64() + req := APPKeySync{ + AppID: appID, + AppKey: appKey, + } + gLog.Printf(LvINFO, "sync appkey to %s", config.PeerNode) + pn.push(config.PeerNode, MsgPushAPPKey, &req) + } app := p2pApp{ id: appID, key: appKey, @@ -286,21 +292,21 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error { } func (pn *P2PNetwork) DeleteApp(config AppConfig) { - gLog.Printf(LevelINFO, "DeleteApp %s%d start", config.Protocol, config.SrcPort) - defer gLog.Printf(LevelINFO, "DeleteApp %s%d end", config.Protocol, config.SrcPort) + gLog.Printf(LvINFO, "DeleteApp %s%d start", config.Protocol, config.SrcPort) + defer gLog.Printf(LvINFO, "DeleteApp %s%d end", config.Protocol, config.SrcPort) // close the apps of this config i, ok := pn.apps.Load(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) if ok { app := i.(*p2pApp) - gLog.Printf(LevelINFO, "app %s exist, delete it", fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) + gLog.Printf(LvINFO, "app %s exist, delete it", fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) app.close() pn.apps.Delete(fmt.Sprintf("%s%d", config.Protocol, config.SrcPort)) } } func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, error) { - gLog.Printf(LevelDEBUG, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) - defer gLog.Printf(LevelDEBUG, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) + gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) + defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) isClient := false // client side tid=0, assign random uint64 if tid == 0 { @@ -320,11 +326,11 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, } // client side checking - gLog.Println(LevelINFO, "tunnel already exist ", config.PeerNode) + gLog.Println(LvINFO, "tunnel already exist ", config.PeerNode) isActive := t.checkActive() // inactive, close it if !isActive { - gLog.Println(LevelINFO, "but it's not active, close it ", config.PeerNode) + gLog.Println(LvINFO, "but it's not active, close it ", config.PeerNode) t.close() } else { // active @@ -346,47 +352,40 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, t.init() if isClient { if err := t.connect(); err != nil { - gLog.Println(LevelERROR, "p2pTunnel connect error:", err) + gLog.Println(LvERROR, "p2pTunnel connect error:", err) return t, err } } else { - rsp := PushConnectRsp{ - Error: 0, - Detail: "connect ok", - To: t.config.PeerNode, - From: pn.config.Node, - NatType: pn.config.natType, - FromIP: pn.config.publicIP, - ConeNatPort: t.coneNatPort, - ID: t.id} - t.pn.push(t.config.PeerNode, MsgPushConnectRsp, rsp) if err := t.listen(); err != nil { - gLog.Println(LevelERROR, "p2pTunnel listen error:", err) + gLog.Println(LvERROR, "p2pTunnel listen error:", err) return t, err } } } // store it when success - gLog.Printf(LevelDEBUG, "store tunnel %d", tid) + gLog.Printf(LvDEBUG, "store tunnel %d", tid) pn.allTunnels.Store(tid, t) return t, nil } func (pn *P2PNetwork) init() error { - gLog.Println(LevelINFO, "init start") + gLog.Println(LvINFO, "init start") var err error for { // detect nat type - pn.config.publicIP, pn.config.natType, err = getNATType(pn.config.ServerHost, pn.config.UDPPort1, pn.config.UDPPort2) - // TODO rm test s2s + pn.config.publicIP, pn.config.natType, pn.config.hasUPNPorNATPMP, err = getNATType(pn.config.ServerHost, pn.config.UDPPort1, pn.config.UDPPort2) + // for testcase if strings.Contains(pn.config.Node, "openp2pS2STest") { pn.config.natType = NATSymmetric } + if strings.Contains(pn.config.Node, "openp2pC2CTest") { + pn.config.natType = NATCone + } if err != nil { - gLog.Println(LevelDEBUG, "detect NAT type error:", err) + gLog.Println(LvDEBUG, "detect NAT type error:", err) break } - gLog.Println(LevelDEBUG, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP) + gLog.Println(LvDEBUG, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP) gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort) forwardPath := "/openp2p/v1/login" config := tls.Config{InsecureSkipVerify: true} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert @@ -419,28 +418,30 @@ func (pn *P2PNetwork) init() error { pn.config.os = getOsName() req := ReportBasic{ - Mac: pn.config.mac, - LanIP: pn.config.localIP, - OS: pn.config.os, - Version: OpenP2PVersion, + Mac: pn.config.mac, + LanIP: pn.config.localIP, + OS: pn.config.os, + HasIPv4: pn.config.hasIPv4, + HasUPNPorNATPMP: pn.config.hasUPNPorNATPMP, + Version: OpenP2PVersion, } rsp := netInfo() - gLog.Println(LevelDEBUG, "netinfo:", rsp) + gLog.Println(LvDEBUG, "netinfo:", rsp) if rsp != nil && rsp.Country != "" { - if len(rsp.IP) == net.IPv6len { + if IsIPv6(rsp.IP.String()) { pn.config.ipv6 = rsp.IP.String() req.IPv6 = rsp.IP.String() } req.NetInfo = *rsp } pn.write(MsgReport, MsgReportBasic, &req) - gLog.Println(LevelDEBUG, "P2PNetwork init ok") + gLog.Println(LvDEBUG, "P2PNetwork init ok") break } if err != nil { // init failed, retry pn.restartCh <- true - gLog.Println(LevelERROR, "P2PNetwork init error:", err) + gLog.Println(LvERROR, "P2PNetwork init error:", err) } return err } @@ -449,7 +450,7 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) { head := openP2PHeader{} err := binary.Read(bytes.NewReader(msg[:openP2PHeaderSize]), binary.LittleEndian, &head) if err != nil { - gLog.Println(LevelERROR, "handleMessage error:", err) + gLog.Println(LvERROR, "handleMessage error:", err) return } switch head.MainType { @@ -458,11 +459,11 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) { rsp := LoginRsp{} err = json.Unmarshal(msg[openP2PHeaderSize:], &rsp) if err != nil { - gLog.Printf(LevelERROR, "wrong login response:%s", err) + gLog.Printf(LvERROR, "wrong login response:%s", err) return } if rsp.Error != 0 { - gLog.Printf(LevelERROR, "login error:%d, detail:%s", rsp.Error, rsp.Detail) + gLog.Printf(LvERROR, "login error:%d, detail:%s", rsp.Error, rsp.Detail) pn.running = false } else { pn.serverTs = rsp.Ts @@ -472,10 +473,10 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) { gConf.setUser(rsp.User) gConf.save() pn.localTs = time.Now().Unix() - gLog.Printf(LevelINFO, "login ok. user=%s,Server ts=%d, local ts=%d", rsp.User, rsp.Ts, pn.localTs) + gLog.Printf(LvINFO, "login ok. user=%s,Server ts=%d, local ts=%d", rsp.User, rsp.Ts, pn.localTs) } case MsgHeartbeat: - gLog.Printf(LevelDEBUG, "P2PNetwork heartbeat ok") + gLog.Printf(LvDEBUG, "P2PNetwork heartbeat ok") case MsgPush: handlePush(pn, head.SubType, msg) default: @@ -488,21 +489,21 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) { } func (pn *P2PNetwork) readLoop() { - gLog.Printf(LevelDEBUG, "P2PNetwork readLoop start") + gLog.Printf(LvDEBUG, "P2PNetwork readLoop start") pn.wg.Add(1) defer pn.wg.Done() for pn.running { pn.conn.SetReadDeadline(time.Now().Add(NetworkHeartbeatTime + 10*time.Second)) t, msg, err := pn.conn.ReadMessage() if err != nil { - gLog.Printf(LevelERROR, "P2PNetwork read error:%s", err) + gLog.Printf(LvERROR, "P2PNetwork read error:%s", err) pn.conn.Close() pn.restartCh <- true break } pn.handleMessage(t, msg) } - gLog.Printf(LevelDEBUG, "P2PNetwork readLoop end") + gLog.Printf(LvDEBUG, "P2PNetwork readLoop end") } func (pn *P2PNetwork) write(mainType uint16, subType uint16, packet interface{}) error { @@ -516,14 +517,14 @@ func (pn *P2PNetwork) write(mainType uint16, subType uint16, packet interface{}) pn.writeMtx.Lock() defer pn.writeMtx.Unlock() if err = pn.conn.WriteMessage(websocket.BinaryMessage, msg); err != nil { - gLog.Printf(LevelERROR, "write msgType %d,%d error:%s", mainType, subType, err) + gLog.Printf(LvERROR, "write msgType %d,%d error:%s", mainType, subType, err) pn.conn.Close() } return err } func (pn *P2PNetwork) relay(to uint64, body []byte) error { - gLog.Printf(LevelDEBUG, "relay data to %d", to) + gLog.Printf(LvDEBUG, "relay data to %d", to) i, ok := pn.allTunnels.Load(to) if !ok { return nil @@ -537,7 +538,7 @@ func (pn *P2PNetwork) relay(to uint64, body []byte) error { } func (pn *P2PNetwork) push(to string, subType uint16, packet interface{}) error { - gLog.Printf(LevelDEBUG, "push msgType %d to %s", subType, to) + gLog.Printf(LvDEBUG, "push msgType %d to %s", subType, to) if !pn.online { return errors.New("client offline") } @@ -559,7 +560,7 @@ func (pn *P2PNetwork) push(to string, subType uint16, packet interface{}) error pn.writeMtx.Lock() defer pn.writeMtx.Unlock() if err = pn.conn.WriteMessage(websocket.BinaryMessage, pushMsg); err != nil { - gLog.Printf(LevelERROR, "push to %s error:%s", to, err) + gLog.Printf(LvERROR, "push to %s error:%s", to, err) pn.conn.Close() } return err @@ -578,13 +579,13 @@ func (pn *P2PNetwork) read(node string, mainType uint16, subType uint16, timeout for { select { case <-time.After(timeout): - gLog.Printf(LevelERROR, "wait msg%d:%d timeout", mainType, subType) + gLog.Printf(LvERROR, "wait msg%d:%d timeout", mainType, subType) return case msg := <-ch: head = &openP2PHeader{} err := binary.Read(bytes.NewReader(msg[:openP2PHeaderSize]), binary.LittleEndian, head) if err != nil { - gLog.Println(LevelERROR, "read msg error:", err) + gLog.Println(LvERROR, "read msg error:", err) break } if head.MainType != mainType || head.SubType != subType { diff --git a/p2ptunnel.go b/p2ptunnel.go index 6b648f8..e6bce01 100644 --- a/p2ptunnel.go +++ b/p2ptunnel.go @@ -14,7 +14,7 @@ import ( type P2PTunnel struct { pn *P2PNetwork - conn p2pConn + conn underlay hbTime time.Time hbMtx sync.Mutex hbTimeRelay time.Time @@ -25,7 +25,7 @@ type P2PTunnel struct { id uint64 running bool runMtx sync.Mutex - isServer bool // 0:server 1:client + tunnelServer bool // different from underlayServer coneLocalPort int coneNatPort int } @@ -39,29 +39,47 @@ func (t *P2PTunnel) init() { localPort := int(rand.Uint32()%10000 + 50000) if t.pn.config.natType == NATCone { // prepare one random cone hole - _, _, port1, _ := natTest(t.pn.config.ServerHost, t.pn.config.UDPPort1, localPort, 0) + _, _, _, port1, _ := natTest(t.pn.config.ServerHost, t.pn.config.UDPPort1, localPort, 0) t.coneLocalPort = localPort t.coneNatPort = port1 t.la = &net.UDPAddr{IP: net.ParseIP(t.pn.config.localIP), Port: t.coneLocalPort} } else { t.coneLocalPort = localPort - t.coneNatPort = localPort // NATNONE or symmetric doesn't need coneNatPort + t.coneNatPort = localPort // symmetric doesn't need coneNatPort + if t.pn.config.hasUPNPorNATPMP == 1 { + nat, err := Discover() + if err != nil { + gLog.Println(LvDEBUG, "could not perform UPNP discover:", err) + } else { + externalPort, err := nat.AddPortMapping("tcp", localPort, localPort, "openp2p", 30) // timeout the connection still alive, make the timeout short + if err != nil { + gLog.Println(LvDEBUG, "could not add udp UPNP port mapping", externalPort) + } + } + } t.la = &net.UDPAddr{IP: net.ParseIP(t.pn.config.localIP), Port: t.coneLocalPort} } - gLog.Printf(LevelDEBUG, "prepare punching port %d:%d", t.coneLocalPort, t.coneNatPort) + gLog.Printf(LvDEBUG, "prepare punching port %d:%d", t.coneLocalPort, t.coneNatPort) } func (t *P2PTunnel) connect() error { - gLog.Printf(LevelDEBUG, "start p2pTunnel to %s ", t.config.PeerNode) - t.isServer = false + gLog.Printf(LvDEBUG, "start p2pTunnel to %s ", t.config.PeerNode) + t.tunnelServer = false + appKey := uint64(0) req := PushConnectReq{ - Token: t.config.peerToken, - From: t.pn.config.Node, - FromToken: t.pn.config.Token, - FromIP: t.pn.config.publicIP, - ConeNatPort: t.coneNatPort, - NatType: t.pn.config.natType, - ID: t.id} + Token: t.config.peerToken, + From: t.pn.config.Node, + FromToken: t.pn.config.Token, + FromIP: t.pn.config.publicIP, + ConeNatPort: t.coneNatPort, + NatType: t.pn.config.natType, + HasIPv4: t.pn.config.hasIPv4, + IPv6: t.pn.config.IPv6, + HasUPNPorNATPMP: t.pn.config.hasUPNPorNATPMP, + ID: t.id, + AppKey: appKey, + Version: OpenP2PVersion, + } t.pn.push(t.config.PeerNode, MsgPushConnectReq, req) head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, time.Second*10) if head == nil { @@ -70,19 +88,23 @@ func (t *P2PTunnel) connect() error { rsp := PushConnectRsp{} err := json.Unmarshal(body, &rsp) if err != nil { - gLog.Printf(LevelERROR, "wrong MsgPushConnectRsp:%s", err) + gLog.Printf(LvERROR, "wrong MsgPushConnectRsp:%s", err) return err } // gLog.Println(LevelINFO, rsp) if rsp.Error != 0 { return errors.New(rsp.Detail) } - t.config.peerNatType = int(rsp.NatType) + t.config.peerNatType = rsp.NatType + t.config.hasIPv4 = rsp.HasIPv4 + t.config.IPv6 = rsp.IPv6 + t.config.hasUPNPorNATPMP = rsp.HasUPNPorNATPMP + t.config.peerVersion = rsp.Version t.config.peerConeNatPort = rsp.ConeNatPort t.config.peerIP = rsp.FromIP - err = t.handshake() + err = t.start() if err != nil { - gLog.Println(LevelERROR, "handshake error:", err) + gLog.Println(LvERROR, "handshake error:", err) err = ErrorHandshake } return err @@ -135,6 +157,20 @@ func (t *P2PTunnel) close() { t.pn.allTunnels.Delete(t.id) } +func (t *P2PTunnel) start() error { + if !t.isSupportTCP() { + if err := t.handshake(); err != nil { + return err + } + } + err := t.connectUnderlay() + if err != nil { + gLog.Println(LvERROR, err) + return err + } + return nil +} + func (t *P2PTunnel) handshake() error { if t.config.peerConeNatPort > 0 { var err error @@ -143,7 +179,7 @@ func (t *P2PTunnel) handshake() error { return err } } - gLog.Println(LevelDEBUG, "handshake to ", t.config.PeerNode) + gLog.Println(LvDEBUG, "handshake to ", t.config.PeerNode) var err error // TODO: handle NATNone, nodes with public ip has no punching if (t.pn.config.natType == NATCone && t.config.peerNatType == NATCone) || (t.pn.config.natType == NATNone || t.config.peerNatType == NATNone) { @@ -159,50 +195,70 @@ func (t *P2PTunnel) handshake() error { return errors.New("unknown error") } if err != nil { - gLog.Println(LevelERROR, "punch handshake error:", err) - return err - } - gLog.Printf(LevelDEBUG, "handshake to %s ok", t.config.PeerNode) - err = t.run() - if err != nil { - gLog.Println(LevelERROR, err) + gLog.Println(LvERROR, "punch handshake error:", err) return err } + gLog.Printf(LvDEBUG, "handshake to %s ok", t.config.PeerNode) return nil } -func (t *P2PTunnel) run() error { - if t.isServer { - qConn, e := listenQuic(t.la.String(), TunnelIdleTimeout) - if e != nil { - gLog.Println(LevelINFO, "listen quic error:", e, ", retry...") - time.Sleep(time.Millisecond * 10) - qConn, e = listenQuic(t.la.String(), TunnelIdleTimeout) - if e != nil { - return fmt.Errorf("listen quic error:%s", e) - } +func (t *P2PTunnel) connectUnderlay() (err error) { + if !t.isSupportTCP() { + t.conn, err = t.connectUnderlayQuic() + if err != nil { + return err } - t.pn.push(t.config.PeerNode, MsgPushQuicConnect, nil) - e = qConn.Accept() - if e != nil { + } else { + // TODO: udp or tcp first? + // prepare a la ra for udp + // t.conn, err = t.connectUnderlayQuic() + // TODO: support ipv6 + // if t.pn.config.hasIPv4 == 1 || t.config.hasIPv4 == 1 { + t.conn, err = t.connectUnderlayTCP() + if err != nil { + return err + } + // } + // if IsIPv6(t.pn.config.IPv6) && IsIPv6(t.config.IPv6) { // both have ipv6 + // t.conn, err = t.connectUnderlayTCP6() + // if err != nil { + // return err + // } + // } + } + t.setRun(true) + go t.readLoop() + go t.heartbeatLoop() + return nil +} + +func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) { + gLog.Println(LvINFO, "connectUnderlayQuic start") + defer gLog.Println(LvINFO, "connectUnderlayQuic end") + var qConn *underlayQUIC + if t.isUnderlayServer() { + time.Sleep(time.Millisecond * 10) // punching udp port will need some times in some env + qConn, err = listenQuic(t.la.String(), TunnelIdleTimeout) + if err != nil { + gLog.Println(LvINFO, "listen quic error:", err, ", retry...") + } + t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) + err = qConn.Accept() + if err != nil { qConn.CloseListener() - return fmt.Errorf("accept quic error:%s", e) + return nil, fmt.Errorf("accept quic error:%s", err) } - _, buff, err := qConn.ReadMessage() - if e != nil { + _, buff, err := qConn.ReadBuffer() + if err != nil { qConn.listener.Close() - return fmt.Errorf("read start msg error:%s", err) + return nil, fmt.Errorf("read start msg error:%s", err) } if buff != nil { - gLog.Println(LevelDEBUG, string(buff)) + gLog.Println(LvDEBUG, string(buff)) } qConn.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2")) - gLog.Println(LevelDEBUG, "quic connection ok") - t.conn = qConn - t.setRun(true) - go t.readLoop() - go t.writeLoop() - return nil + gLog.Println(LvDEBUG, "quic connection ok") + return qConn, nil } //else @@ -211,44 +267,133 @@ func (t *P2PTunnel) run() error { time.Sleep(time.Millisecond * 10) conn, e = net.ListenUDP("udp", t.la) if e != nil { - return fmt.Errorf("quic listen error:%s", e) + return nil, fmt.Errorf("quic listen error:%s", e) } } - t.pn.read(t.config.PeerNode, MsgPush, MsgPushQuicConnect, time.Second*5) - gLog.Println(LevelDEBUG, "quic dial to ", t.ra.String()) - qConn, e := dialQuic(conn, t.ra, TunnelIdleTimeout) + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5) + gLog.Println(LvDEBUG, "quic dial to ", t.ra.String()) + qConn, e = dialQuic(conn, t.ra, TunnelIdleTimeout) if e != nil { - return fmt.Errorf("quic dial to %s error:%s", t.ra.String(), e) + return nil, fmt.Errorf("quic dial to %s error:%s", t.ra.String(), e) } handshakeBegin := time.Now() qConn.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) - _, buff, err := qConn.ReadMessage() + _, buff, err := qConn.ReadBuffer() if e != nil { qConn.listener.Close() - return fmt.Errorf("read MsgTunnelHandshake error:%s", err) + return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err) } if buff != nil { - gLog.Println(LevelDEBUG, string(buff)) + gLog.Println(LvDEBUG, string(buff)) } - gLog.Println(LevelINFO, "rtt=", time.Since(handshakeBegin)) - gLog.Println(LevelDEBUG, "quic connection ok") - t.conn = qConn - t.setRun(true) - go t.readLoop() - go t.writeLoop() - return nil + gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin)) + gLog.Println(LvDEBUG, "quic connection ok") + return qConn, nil +} + +// websocket +func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) { + gLog.Println(LvINFO, "connectUnderlayTCP start") + defer gLog.Println(LvINFO, "connectUnderlayTCP end") + var qConn *underlayTCP + if t.isUnderlayServer() { + t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) + qConn, err = listenTCP(t.coneNatPort, TunnelIdleTimeout) + if err != nil { + return nil, fmt.Errorf("listen TCP error:%s", err) + } + _, buff, err := qConn.ReadBuffer() + if err != nil { + qConn.listener.Close() + return nil, fmt.Errorf("read start msg error:%s", err) + } + if buff != nil { + gLog.Println(LvDEBUG, string(buff)) + } + qConn.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2")) + gLog.Println(LvDEBUG, "TCP connection ok") + return qConn, nil + } + + //else + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5) + gLog.Println(LvDEBUG, "TCP dial to ", t.ra.String()) + qConn, err = dialTCP(t.config.peerIP, t.config.peerConeNatPort) + if err != nil { + return nil, fmt.Errorf("TCP dial to %s error:%s", t.ra.String(), err) + } + handshakeBegin := time.Now() + qConn.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) + _, buff, err := qConn.ReadBuffer() + if err != nil { + qConn.listener.Close() + return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err) + } + if buff != nil { + gLog.Println(LvDEBUG, string(buff)) + } + + gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin)) + gLog.Println(LvDEBUG, "TCP connection ok") + return qConn, nil +} + +func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) { + gLog.Println(LvINFO, "connectUnderlayTCP start") + defer gLog.Println(LvINFO, "connectUnderlayTCP end") + var qConn *underlayTCP6 + if t.isUnderlayServer() { + t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) + qConn, err = listenTCP6(t.coneNatPort, TunnelIdleTimeout) + if err != nil { + return nil, fmt.Errorf("listen TCP error:%s", err) + } + _, buff, err := qConn.ReadBuffer() + if err != nil { + qConn.listener.Close() + return nil, fmt.Errorf("read start msg error:%s", err) + } + if buff != nil { + gLog.Println(LvDEBUG, string(buff)) + } + qConn.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2")) + gLog.Println(LvDEBUG, "TCP connection ok") + return qConn, nil + } + + //else + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5) + gLog.Println(LvDEBUG, "TCP dial to ", t.ra.String()) + qConn, err = dialTCP6(t.config.IPv6, t.config.peerConeNatPort) + if err != nil { + return nil, fmt.Errorf("TCP dial to %s error:%s", t.ra.String(), err) + } + handshakeBegin := time.Now() + qConn.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) + _, buff, err := qConn.ReadBuffer() + if err != nil { + qConn.listener.Close() + return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err) + } + if buff != nil { + gLog.Println(LvDEBUG, string(buff)) + } + + gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin)) + gLog.Println(LvDEBUG, "TCP connection ok") + return qConn, nil } func (t *P2PTunnel) readLoop() { decryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding - gLog.Printf(LevelDEBUG, "%d tunnel readloop start", t.id) + gLog.Printf(LvDEBUG, "%d tunnel readloop start", t.id) for t.isRuning() { t.conn.SetReadDeadline(time.Now().Add(TunnelIdleTimeout)) - head, body, err := t.conn.ReadMessage() + head, body, err := t.conn.ReadBuffer() if err != nil { if t.isRuning() { - gLog.Printf(LevelERROR, "%d tunnel read error:%s", t.id, err) + gLog.Printf(LvERROR, "%d tunnel read error:%s", t.id, err) } break } @@ -258,22 +403,22 @@ func (t *P2PTunnel) readLoop() { switch head.SubType { case MsgTunnelHeartbeat: t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeatAck, nil) - gLog.Printf(LevelDEBUG, "%d read tunnel heartbeat", t.id) + gLog.Printf(LvDEBUG, "%d read tunnel heartbeat", t.id) case MsgTunnelHeartbeatAck: t.hbMtx.Lock() t.hbTime = time.Now() t.hbMtx.Unlock() - gLog.Printf(LevelDEBUG, "%d read tunnel heartbeat ack", t.id) + gLog.Printf(LvDEBUG, "%d read tunnel heartbeat ack", t.id) case MsgOverlayData: if len(body) < overlayHeaderSize { continue } overlayID := binary.LittleEndian.Uint64(body[:8]) - gLog.Printf(LevelDEBUG, "%d tunnel read overlay data %d", t.id, overlayID) + gLog.Printf(LvDEBUG, "%d tunnel read overlay data %d", t.id, overlayID) s, ok := t.overlayConns.Load(overlayID) if !ok { // debug level, when overlay connection closed, always has some packet not found tunnel - gLog.Printf(LevelDEBUG, "%d tunnel not found overlay connection %d", t.id, overlayID) + gLog.Printf(LvDEBUG, "%d tunnel not found overlay connection %d", t.id, overlayID) continue } overlayConn, ok := s.(*overlayConn) @@ -287,10 +432,10 @@ func (t *P2PTunnel) readLoop() { } _, err = overlayConn.Write(payload) if err != nil { - gLog.Println(LevelERROR, "overlay write error:", err) + gLog.Println(LvERROR, "overlay write error:", err) } case MsgRelayData: - gLog.Printf(LevelDEBUG, "got relay data datalen=%d", head.DataLen) + gLog.Printf(LvDEBUG, "got relay data datalen=%d", head.DataLen) if len(body) < 8 { continue } @@ -300,10 +445,10 @@ func (t *P2PTunnel) readLoop() { req := RelayHeartbeat{} err := json.Unmarshal(body, &req) if err != nil { - gLog.Printf(LevelERROR, "wrong RelayHeartbeat:%s", err) + gLog.Printf(LvERROR, "wrong RelayHeartbeat:%s", err) continue } - gLog.Printf(LevelDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID) + gLog.Printf(LvDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID) relayHead := new(bytes.Buffer) binary.Write(relayHead, binary.LittleEndian, req.RelayTunnelID) msg, _ := newMessage(MsgP2P, MsgRelayHeartbeatAck, &req) @@ -313,26 +458,26 @@ func (t *P2PTunnel) readLoop() { req := RelayHeartbeat{} err := json.Unmarshal(body, &req) if err != nil { - gLog.Printf(LevelERROR, "wrong RelayHeartbeat:%s", err) + gLog.Printf(LvERROR, "wrong RelayHeartbeat:%s", err) continue } - gLog.Printf(LevelDEBUG, "got MsgRelayHeartbeatAck to %d", req.AppID) + gLog.Printf(LvDEBUG, "got MsgRelayHeartbeatAck to %d", req.AppID) t.pn.updateAppHeartbeat(req.AppID) case MsgOverlayConnectReq: req := OverlayConnectReq{} err := json.Unmarshal(body, &req) if err != nil { - gLog.Printf(LevelERROR, "wrong MsgOverlayConnectReq:%s", err) + gLog.Printf(LvERROR, "wrong MsgOverlayConnectReq:%s", err) continue } // app connect only accept token(not relay totp token), avoid someone using the share relay node's token if req.Token != t.pn.config.Token { - gLog.Println(LevelERROR, "Access Denied:", req.Token) + gLog.Println(LvERROR, "Access Denied:", req.Token) continue } overlayID := req.ID - gLog.Printf(LevelDEBUG, "App:%d overlayID:%d connect %+v", req.AppID, overlayID, req) + gLog.Printf(LvDEBUG, "App:%d overlayID:%d connect %+v", req.AppID, overlayID, req) oConn := overlayConn{ tunnel: t, id: overlayID, @@ -347,7 +492,7 @@ func (t *P2PTunnel) readLoop() { oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5) } if err != nil { - gLog.Println(LevelERROR, err) + gLog.Println(LvERROR, err) continue } @@ -365,11 +510,11 @@ func (t *P2PTunnel) readLoop() { req := OverlayDisconnectReq{} err := json.Unmarshal(body, &req) if err != nil { - gLog.Printf(LevelERROR, "wrong OverlayDisconnectRequest:%s", err) + gLog.Printf(LvERROR, "wrong OverlayDisconnectRequest:%s", err) continue } overlayID := req.ID - gLog.Printf(LevelDEBUG, "%d disconnect overlay connection %d", t.id, overlayID) + gLog.Printf(LvDEBUG, "%d disconnect overlay connection %d", t.id, overlayID) i, ok := t.overlayConns.Load(overlayID) if ok { oConn := i.(*overlayConn) @@ -380,32 +525,49 @@ func (t *P2PTunnel) readLoop() { } t.setRun(false) t.conn.Close() - gLog.Printf(LevelDEBUG, "%d tunnel readloop end", t.id) + gLog.Printf(LvDEBUG, "%d tunnel readloop end", t.id) } -func (t *P2PTunnel) writeLoop() { +func (t *P2PTunnel) heartbeatLoop() { tc := time.NewTicker(TunnelHeartbeatTime) defer tc.Stop() - defer gLog.Printf(LevelDEBUG, "%d tunnel writeloop end", t.id) + gLog.Printf(LvDEBUG, "%d tunnel heartbeatLoop start", t.id) + defer gLog.Printf(LvDEBUG, "%d tunnel heartbeatLoop end", t.id) for t.isRuning() { select { case <-tc.C: // tunnel send err := t.conn.WriteBytes(MsgP2P, MsgTunnelHeartbeat, nil) if err != nil { - gLog.Printf(LevelERROR, "%d write tunnel heartbeat error %s", t.id, err) + gLog.Printf(LvERROR, "%d write tunnel heartbeat error %s", t.id, err) t.setRun(false) return } - gLog.Printf(LevelDEBUG, "%d write tunnel heartbeat ok", t.id) + gLog.Printf(LvDEBUG, "%d write tunnel heartbeat ok", t.id) } } } func (t *P2PTunnel) listen() error { - gLog.Printf(LevelDEBUG, "p2ptunnel wait for connecting") - t.isServer = true - return t.handshake() + // notify client to connect + rsp := PushConnectRsp{ + Error: 0, + Detail: "connect ok", + To: t.config.PeerNode, + From: t.pn.config.Node, + NatType: t.pn.config.natType, + HasIPv4: t.pn.config.hasIPv4, + IPv6: t.pn.config.IPv6, + HasUPNPorNATPMP: t.pn.config.hasUPNPorNATPMP, + FromIP: t.pn.config.publicIP, + ConeNatPort: t.coneNatPort, + ID: t.id, + Version: OpenP2PVersion, + } + t.pn.push(t.config.PeerNode, MsgPushConnectRsp, rsp) + gLog.Printf(LvDEBUG, "p2ptunnel wait for connecting") + t.tunnelServer = true + return t.start() } func (t *P2PTunnel) closeOverlayConns(appID uint64) { @@ -424,3 +586,24 @@ func (t *P2PTunnel) closeOverlayConns(appID uint64) { return true }) } + +func (t *P2PTunnel) isUnderlayServer() bool { + if t.pn.config.natType == NATNone && t.config.peerNatType != NATNone { + return true + } + if t.pn.config.natType != NATNone && t.config.peerNatType == NATNone { + return false + } + // NAT or both has public IP + return t.tunnelServer +} + +func (t *P2PTunnel) isSupportTCP() bool { + if t.config.peerVersion == "" || compareVersion(t.config.peerVersion, LeastSupportTCPVersion) == LESS { + return false + } + if t.pn.config.natType == NATNone || t.config.peerNatType == NATNone { + return true + } + return false +} diff --git a/protocol.go b/protocol.go index 73585da..04eac44 100644 --- a/protocol.go +++ b/protocol.go @@ -10,8 +10,9 @@ import ( "time" ) -const OpenP2PVersion = "1.4.2" +const OpenP2PVersion = "1.5.6" const ProducnName string = "openp2p" +const LeastSupportTCPVersion = "1.5.0" type openP2PHeader struct { DataLen uint32 @@ -69,6 +70,7 @@ const ( MsgReport = 6 ) +// TODO: seperate node push and web push. const ( MsgPushRsp = 0 MsgPushConnectReq = 1 @@ -78,11 +80,12 @@ const ( MsgPushAddRelayTunnelRsp = 5 MsgPushUpdate = 6 MsgPushReportApps = 7 - MsgPushQuicConnect = 8 + MsgPushUnderlayConnect = 8 MsgPushEditApp = 9 MsgPushSwitchApp = 10 MsgPushRestart = 11 MsgPushEditNode = 12 + MsgPushAPPKey = 13 ) // MsgP2P sub type message @@ -131,7 +134,7 @@ const ( AESKeySize = 16 MaxRetry = 10 RetryInterval = time.Second * 30 - PublicIPEchoTimeout = time.Second * 3 + PublicIPEchoTimeout = time.Second * 1 NatTestTimeout = time.Second * 10 ClientAPITimeout = time.Second * 10 MaxDirectTry = 5 @@ -145,6 +148,13 @@ const ( NATUnknown = 314 ) +// underlay protocol +const ( + UderlayAuto = "auto" + UderlayQUIC = "quic" + UderlayTCP = "tcp" +) + func newMessage(mainType uint16, subType uint16, packet interface{}) ([]byte, error) { data, err := json.Marshal(packet) if err != nil { @@ -170,23 +180,32 @@ func nodeNameToID(name string) uint64 { } type PushConnectReq struct { - From string `json:"from,omitempty"` - FromToken uint64 `json:"fromToken,omitempty"` //my token - Token uint64 `json:"token,omitempty"` // totp token - ConeNatPort int `json:"coneNatPort,omitempty"` - NatType int `json:"natType,omitempty"` - FromIP string `json:"fromIP,omitempty"` - ID uint64 `json:"id,omitempty"` + From string `json:"from,omitempty"` + FromToken uint64 `json:"fromToken,omitempty"` //my token + Version string `json:"version,omitempty"` + Token uint64 `json:"token,omitempty"` // totp token + ConeNatPort int `json:"coneNatPort,omitempty"` // if isPublic, is public port + NatType int `json:"natType,omitempty"` + HasIPv4 int `json:"hasIPv4,omitempty"` + IPv6 string `json:"IPv6,omitempty"` + HasUPNPorNATPMP int `json:"hasUPNPorNATPMP,omitempty"` + FromIP string `json:"fromIP,omitempty"` + ID uint64 `json:"id,omitempty"` + AppKey uint64 `json:"appKey,omitempty"` // for underlay tcp } type PushConnectRsp struct { - Error int `json:"error,omitempty"` - From string `json:"from,omitempty"` - To string `json:"to,omitempty"` - Detail string `json:"detail,omitempty"` - NatType int `json:"natType,omitempty"` - ConeNatPort int `json:"coneNatPort,omitempty"` - FromIP string `json:"fromIP,omitempty"` - ID uint64 `json:"id,omitempty"` + Error int `json:"error,omitempty"` + From string `json:"from,omitempty"` + To string `json:"to,omitempty"` + Detail string `json:"detail,omitempty"` + NatType int `json:"natType,omitempty"` + HasIPv4 int `json:"hasIPv4,omitempty"` + IPv6 string `json:"IPv6,omitempty"` + HasUPNPorNATPMP int `json:"hasUPNPorNATPMP,omitempty"` + ConeNatPort int `json:"coneNatPort,omitempty"` //it's not only cone, but also upnp or nat-pmp hole + FromIP string `json:"fromIP,omitempty"` + ID uint64 `json:"id,omitempty"` + Version string `json:"version,omitempty"` } type PushRsp struct { Error int `json:"error,omitempty"` @@ -246,8 +265,13 @@ type AddRelayTunnelReq struct { From string `json:"from,omitempty"` RelayName string `json:"relayName,omitempty"` RelayToken uint64 `json:"relayToken,omitempty"` - AppID uint64 `json:"appID,omitempty"` - AppKey uint64 `json:"appKey,omitempty"` + AppID uint64 `json:"appID,omitempty"` // deprecated + AppKey uint64 `json:"appKey,omitempty"` // deprecated +} + +type APPKeySync struct { + AppID uint64 `json:"appID,omitempty"` + AppKey uint64 `json:"appKey,omitempty"` } type RelayHeartbeat struct { @@ -256,12 +280,14 @@ type RelayHeartbeat struct { } type ReportBasic struct { - OS string `json:"os,omitempty"` - Mac string `json:"mac,omitempty"` - LanIP string `json:"lanIP,omitempty"` - IPv6 string `json:"IPv6,omitempty"` - Version string `json:"version,omitempty"` - NetInfo NetInfo `json:"netInfo,omitempty"` + OS string `json:"os,omitempty"` + Mac string `json:"mac,omitempty"` + LanIP string `json:"lanIP,omitempty"` + HasIPv4 int `json:"hasIPv4,omitempty"` + IPv6 string `json:"IPv6,omitempty"` + HasUPNPorNATPMP int `json:"hasUPNPorNATPMP,omitempty"` + Version string `json:"version,omitempty"` + NetInfo NetInfo `json:"netInfo,omitempty"` } type ReportConnect struct { diff --git a/udp.go b/udp.go index 3ec9a26..d92e99c 100644 --- a/udp.go +++ b/udp.go @@ -23,7 +23,7 @@ func UDPRead(conn *net.UDPConn, timeout int) (ra net.Addr, head *openP2PHeader, deadline := time.Now().Add(time.Millisecond * time.Duration(timeout)) err = conn.SetReadDeadline(deadline) if err != nil { - gLog.Println(LevelERROR, "SetReadDeadline error") + gLog.Println(LvERROR, "SetReadDeadline error") return nil, nil, nil, 0, err } } @@ -37,7 +37,7 @@ func UDPRead(conn *net.UDPConn, timeout int) (ra net.Addr, head *openP2PHeader, head = &openP2PHeader{} err = binary.Read(bytes.NewReader(result[:openP2PHeaderSize]), binary.LittleEndian, head) if err != nil { - gLog.Println(LevelERROR, "parse p2pheader error:", err) + gLog.Println(LvERROR, "parse p2pheader error:", err) return nil, nil, nil, 0, err } return diff --git a/underlay.go b/underlay.go new file mode 100644 index 0000000..86bcb4b --- /dev/null +++ b/underlay.go @@ -0,0 +1,16 @@ +package main + +import ( + "time" +) + +type underlay interface { + ReadBuffer() (*openP2PHeader, []byte, error) + WriteBytes(uint16, uint16, []byte) error + WriteBuffer([]byte) error + WriteMessage(uint16, uint16, interface{}) error + Close() error + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error + Protocol() string +} diff --git a/underlay_quic.go b/underlay_quic.go new file mode 100644 index 0000000..41c6b7a --- /dev/null +++ b/underlay_quic.go @@ -0,0 +1,155 @@ +package main + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "encoding/json" + "encoding/pem" + "fmt" + "io" + "math/big" + "net" + "sync" + "time" + + "github.com/lucas-clemente/quic-go" +) + +//quic.DialContext do not support version 44,disable it +var quicVersion []quic.VersionNumber + +type underlayQUIC struct { + listener quic.Listener + writeMtx *sync.Mutex + quic.Stream + quic.Connection +} + +func (conn *underlayQUIC) Protocol() string { + return "quic" +} + +func (conn *underlayQUIC) ReadBuffer() (*openP2PHeader, []byte, error) { + headBuf := make([]byte, openP2PHeaderSize) + _, err := io.ReadFull(conn, headBuf) + if err != nil { + return nil, nil, err + } + head, err := decodeHeader(headBuf) + if err != nil { + return nil, nil, err + } + dataBuf := make([]byte, head.DataLen) + _, err = io.ReadFull(conn, dataBuf) + return head, dataBuf, err +} + +func (conn *underlayQUIC) WriteBytes(mainType uint16, subType uint16, data []byte) error { + writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) + conn.writeMtx.Lock() + _, err := conn.Write(writeBytes) + conn.writeMtx.Unlock() + return err +} + +func (conn *underlayQUIC) WriteBuffer(data []byte) error { + conn.writeMtx.Lock() + _, err := conn.Write(data) + conn.writeMtx.Unlock() + return err +} + +func (conn *underlayQUIC) WriteMessage(mainType uint16, subType uint16, packet interface{}) error { + // TODO: call newMessage + data, err := json.Marshal(packet) + if err != nil { + return err + } + writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) + conn.writeMtx.Lock() + _, err = conn.Write(writeBytes) + conn.writeMtx.Unlock() + return err +} + +func (conn *underlayQUIC) Close() error { + conn.Stream.CancelRead(1) + conn.Connection.CloseWithError(0, "") + return nil +} +func (conn *underlayQUIC) CloseListener() { + if conn.listener != nil { + conn.listener.Close() + } +} + +func (conn *underlayQUIC) Accept() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + sess, err := conn.listener.Accept(ctx) + if err != nil { + return err + } + stream, err := sess.AcceptStream(context.Background()) + if err != nil { + return err + } + conn.Stream = stream + conn.Connection = sess + return nil +} + +func listenQuic(addr string, idleTimeout time.Duration) (*underlayQUIC, error) { + gLog.Println(LvDEBUG, "quic listen on ", addr) + listener, err := quic.ListenAddr(addr, generateTLSConfig(), + &quic.Config{Versions: quicVersion, MaxIdleTimeout: idleTimeout, DisablePathMTUDiscovery: true}) + if err != nil { + return nil, fmt.Errorf("quic.ListenAddr error:%s", err) + } + return &underlayQUIC{listener: listener, writeMtx: &sync.Mutex{}}, nil +} + +func dialQuic(conn *net.UDPConn, remoteAddr *net.UDPAddr, idleTimeout time.Duration) (*underlayQUIC, error) { + tlsConf := &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{"openp2pv1"}, + } + Connection, err := quic.DialContext(context.Background(), conn, remoteAddr, conn.LocalAddr().String(), tlsConf, + &quic.Config{Versions: quicVersion, MaxIdleTimeout: idleTimeout, DisablePathMTUDiscovery: true}) + if err != nil { + return nil, fmt.Errorf("quic.DialContext error:%s", err) + } + stream, err := Connection.OpenStreamSync(context.Background()) + if err != nil { + return nil, fmt.Errorf("OpenStreamSync error:%s", err) + } + qConn := &underlayQUIC{nil, &sync.Mutex{}, stream, Connection} + return qConn, nil +} + +// Setup a bare-bones TLS config for the server +func generateTLSConfig() *tls.Config { + key, err := rsa.GenerateKey(rand.Reader, 1024) + if err != nil { + panic(err) + } + template := x509.Certificate{SerialNumber: big.NewInt(1)} + certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key) + if err != nil { + panic(err) + } + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)}) + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) + + tlsCert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + panic(err) + } + return &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + NextProtos: []string{"openp2pv1"}, + } +} diff --git a/underlay_tcp.go b/underlay_tcp.go new file mode 100644 index 0000000..65d388e --- /dev/null +++ b/underlay_tcp.go @@ -0,0 +1,92 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "net" + "sync" + "time" +) + +type underlayTCP struct { + listener net.Listener + writeMtx *sync.Mutex + net.Conn +} + +func (conn *underlayTCP) Protocol() string { + return "tcp" +} + +func (conn *underlayTCP) ReadBuffer() (*openP2PHeader, []byte, error) { + headBuf := make([]byte, openP2PHeaderSize) + _, err := io.ReadFull(conn, headBuf) + if err != nil { + return nil, nil, err + } + head, err := decodeHeader(headBuf) + if err != nil { + return nil, nil, err + } + dataBuf := make([]byte, head.DataLen) + _, err = io.ReadFull(conn, dataBuf) + return head, dataBuf, err +} + +func (conn *underlayTCP) WriteBytes(mainType uint16, subType uint16, data []byte) error { + writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) + conn.writeMtx.Lock() + _, err := conn.Write(writeBytes) + conn.writeMtx.Unlock() + return err +} + +func (conn *underlayTCP) WriteBuffer(data []byte) error { + conn.writeMtx.Lock() + _, err := conn.Write(data) + conn.writeMtx.Unlock() + return err +} + +func (conn *underlayTCP) WriteMessage(mainType uint16, subType uint16, packet interface{}) error { + // TODO: call newMessage + data, err := json.Marshal(packet) + if err != nil { + return err + } + writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) + conn.writeMtx.Lock() + _, err = conn.Write(writeBytes) + conn.writeMtx.Unlock() + return err +} + +func (conn *underlayTCP) Close() error { + return conn.Conn.Close() +} + +func listenTCP(port int, idleTimeout time.Duration) (*underlayTCP, error) { + addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", port)) + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return nil, err + } + defer l.Close() + l.SetDeadline(time.Now().Add(SymmetricHandshakeAckTimeout)) + c, err := l.Accept() + defer l.Close() + if err != nil { + return nil, err + } + return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil +} + +func dialTCP(host string, port int) (*underlayTCP, error) { + c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout) + if err != nil { + fmt.Printf("Dial %s:%d error:%s", host, port, err) + return nil, err + } + return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil +} diff --git a/underlay_tcp6.go b/underlay_tcp6.go new file mode 100644 index 0000000..84bcc94 --- /dev/null +++ b/underlay_tcp6.go @@ -0,0 +1,92 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "net" + "sync" + "time" +) + +type underlayTCP6 struct { + listener net.Listener + writeMtx *sync.Mutex + net.Conn +} + +func (conn *underlayTCP6) Protocol() string { + return "tcp6" +} + +func (conn *underlayTCP6) ReadBuffer() (*openP2PHeader, []byte, error) { + headBuf := make([]byte, openP2PHeaderSize) + _, err := io.ReadFull(conn, headBuf) + if err != nil { + return nil, nil, err + } + head, err := decodeHeader(headBuf) + if err != nil { + return nil, nil, err + } + dataBuf := make([]byte, head.DataLen) + _, err = io.ReadFull(conn, dataBuf) + return head, dataBuf, err +} + +func (conn *underlayTCP6) WriteBytes(mainType uint16, subType uint16, data []byte) error { + writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) + conn.writeMtx.Lock() + _, err := conn.Write(writeBytes) + conn.writeMtx.Unlock() + return err +} + +func (conn *underlayTCP6) WriteBuffer(data []byte) error { + conn.writeMtx.Lock() + _, err := conn.Write(data) + conn.writeMtx.Unlock() + return err +} + +func (conn *underlayTCP6) WriteMessage(mainType uint16, subType uint16, packet interface{}) error { + // TODO: call newMessage + data, err := json.Marshal(packet) + if err != nil { + return err + } + writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) + conn.writeMtx.Lock() + _, err = conn.Write(writeBytes) + conn.writeMtx.Unlock() + return err +} + +func (conn *underlayTCP6) Close() error { + return conn.Conn.Close() +} + +func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) { + addr, _ := net.ResolveTCPAddr("tcp6", fmt.Sprintf("0.0.0.0:%d", port)) + l, err := net.ListenTCP("tcp6", addr) + if err != nil { + return nil, err + } + defer l.Close() + l.SetDeadline(time.Now().Add(SymmetricHandshakeAckTimeout)) + c, err := l.Accept() + defer l.Close() + if err != nil { + return nil, err + } + return &underlayTCP6{writeMtx: &sync.Mutex{}, Conn: c}, nil +} + +func dialTCP6(host string, port int) (*underlayTCP6, error) { + c, err := net.DialTimeout("tcp6", fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout) + if err != nil { + fmt.Printf("Dial %s:%d error:%s", host, port, err) + return nil, err + } + return &underlayTCP6{writeMtx: &sync.Mutex{}, Conn: c}, nil +} diff --git a/update.go b/update.go index b5ed136..6a0eb42 100644 --- a/update.go +++ b/update.go @@ -17,8 +17,8 @@ import ( ) func update() { - gLog.Println(LevelINFO, "update start") - defer gLog.Println(LevelINFO, "update end") + gLog.Println(LvINFO, "update start") + defer gLog.Println(LvINFO, "update end") c := http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, @@ -29,43 +29,43 @@ func update() { goarch := runtime.GOARCH rsp, err := c.Get(fmt.Sprintf("https://openp2p.cn:27183/api/v1/update?fromver=%s&os=%s&arch=%s", OpenP2PVersion, goos, goarch)) if err != nil { - gLog.Println(LevelERROR, "update:query update list failed:", err) + gLog.Println(LvERROR, "update:query update list failed:", err) return } defer rsp.Body.Close() if rsp.StatusCode != http.StatusOK { - gLog.Println(LevelERROR, "get update info error:", rsp.Status) + gLog.Println(LvERROR, "get update info error:", rsp.Status) return } rspBuf, err := ioutil.ReadAll(rsp.Body) if err != nil { - gLog.Println(LevelERROR, "update:read update list failed:", err) + gLog.Println(LvERROR, "update:read update list failed:", err) return } updateInfo := UpdateInfo{} err = json.Unmarshal(rspBuf, &updateInfo) if err != nil { - gLog.Println(LevelERROR, rspBuf, " update info decode error:", err) + gLog.Println(LvERROR, rspBuf, " update info decode error:", err) return } if updateInfo.Error != 0 { - gLog.Println(LevelERROR, "update error:", updateInfo.Error, updateInfo.ErrorDetail) + gLog.Println(LvERROR, "update error:", updateInfo.Error, updateInfo.ErrorDetail) return } err = updateFile(updateInfo.Url, "", "openp2p") if err != nil { - gLog.Println(LevelERROR, "update: download failed:", err) + gLog.Println(LvERROR, "update: download failed:", err) return } } // todo rollback on error func updateFile(url string, checksum string, dst string) error { - gLog.Println(LevelINFO, "download ", url) + gLog.Println(LvINFO, "download ", url) tmpFile := filepath.Dir(os.Args[0]) + "/openp2p.tmp" output, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0776) if err != nil { - gLog.Printf(LevelERROR, "OpenFile %s error:%s", tmpFile, err) + gLog.Printf(LvERROR, "OpenFile %s error:%s", tmpFile, err) return err } tr := &http.Transport{ @@ -74,31 +74,31 @@ func updateFile(url string, checksum string, dst string) error { client := &http.Client{Transport: tr} response, err := client.Get(url) if err != nil { - gLog.Printf(LevelERROR, "download url %s error:%s", url, err) + gLog.Printf(LvERROR, "download url %s error:%s", url, err) output.Close() return err } defer response.Body.Close() n, err := io.Copy(output, response.Body) if err != nil { - gLog.Printf(LevelERROR, "io.Copy error:%s", err) + gLog.Printf(LvERROR, "io.Copy error:%s", err) output.Close() return err } output.Sync() output.Close() - gLog.Println(LevelINFO, "download ", url, " ok") - gLog.Printf(LevelINFO, "size: %d bytes", n) + gLog.Println(LvINFO, "download ", url, " ok") + gLog.Printf(LvINFO, "size: %d bytes", n) err = os.Rename(os.Args[0], os.Args[0]+"0") if err != nil && os.IsExist(err) { - gLog.Printf(LevelINFO, " rename %s error:%s", os.Args[0], err) + gLog.Printf(LvINFO, " rename %s error:%s", os.Args[0], err) } // extract - gLog.Println(LevelINFO, "extract files") + gLog.Println(LvINFO, "extract files") err = extract(filepath.Dir(os.Args[0]), tmpFile) if err != nil { - gLog.Printf(LevelERROR, "extract error:%s. revert rename", err) + gLog.Printf(LvERROR, "extract error:%s. revert rename", err) os.Rename(os.Args[0]+"0", os.Args[0]) return err } diff --git a/upnp.go b/upnp.go new file mode 100644 index 0000000..d9aa86a --- /dev/null +++ b/upnp.go @@ -0,0 +1,404 @@ +/* +Taken from taipei-torrent + +Just enough UPnP to be able to forward ports +*/ +package main + +// BUG(jae): TODO: use syscalls to get actual ourIP. http://pastebin.com/9exZG4rh + +import ( + "bytes" + "encoding/xml" + "errors" + "fmt" + "io/ioutil" + "net" + "net/http" + "strconv" + "strings" + "time" +) + +type upnpNAT struct { + serviceURL string + ourIP string + urnDomain string +} + +// protocol is either "udp" or "tcp" +type NAT interface { + GetExternalAddress() (addr net.IP, err error) + AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error) + DeletePortMapping(protocol string, externalPort, internalPort int) (err error) +} + +func Discover() (nat NAT, err error) { + ssdp, err := net.ResolveUDPAddr("udp4", "239.255.255.250:1900") + if err != nil { + return + } + conn, err := net.ListenPacket("udp4", ":0") + if err != nil { + return + } + socket := conn.(*net.UDPConn) + defer socket.Close() // nolint: errcheck + + if err := socket.SetDeadline(time.Now().Add(3 * time.Second)); err != nil { + return nil, err + } + + st := "InternetGatewayDevice:1" + + buf := bytes.NewBufferString( + "M-SEARCH * HTTP/1.1\r\n" + + "HOST: 239.255.255.250:1900\r\n" + + "ST: ssdp:all\r\n" + + "MAN: \"ssdp:discover\"\r\n" + + "MX: 2\r\n\r\n") + message := buf.Bytes() + answerBytes := make([]byte, 1024) + for i := 0; i < 3; i++ { + _, err = socket.WriteToUDP(message, ssdp) + if err != nil { + return + } + var n int + _, _, err = socket.ReadFromUDP(answerBytes) + if err != nil { + return + } + + for { + n, _, err = socket.ReadFromUDP(answerBytes) + if err != nil { + break + } + answer := string(answerBytes[0:n]) + if !strings.Contains(answer, st) { + continue + } + // HTTP header field names are case-insensitive. + // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2 + locString := "\r\nlocation:" + answer = strings.ToLower(answer) + locIndex := strings.Index(answer, locString) + if locIndex < 0 { + continue + } + loc := answer[locIndex+len(locString):] + endIndex := strings.Index(loc, "\r\n") + if endIndex < 0 { + continue + } + locURL := strings.TrimSpace(loc[0:endIndex]) + var serviceURL, urnDomain string + serviceURL, urnDomain, err = getServiceURL(locURL) + if err != nil { + return + } + var ourIP net.IP + ourIP, err = localIPv4() + if err != nil { + return + } + nat = &upnpNAT{serviceURL: serviceURL, ourIP: ourIP.String(), urnDomain: urnDomain} + return + } + } + err = errors.New("UPnP port discovery failed") + return +} + +type Envelope struct { + XMLName xml.Name `xml:"http://schemas.xmlsoap.org/soap/envelope/ Envelope"` + Soap *SoapBody +} +type SoapBody struct { + XMLName xml.Name `xml:"http://schemas.xmlsoap.org/soap/envelope/ Body"` + ExternalIP *ExternalIPAddressResponse +} + +type ExternalIPAddressResponse struct { + XMLName xml.Name `xml:"GetExternalIPAddressResponse"` + IPAddress string `xml:"NewExternalIPAddress"` +} + +type ExternalIPAddress struct { + XMLName xml.Name `xml:"NewExternalIPAddress"` + IP string +} + +type UPNPService struct { + ServiceType string `xml:"serviceType"` + ControlURL string `xml:"controlURL"` +} + +type DeviceList struct { + Device []Device `xml:"device"` +} + +type ServiceList struct { + Service []UPNPService `xml:"service"` +} + +type Device struct { + XMLName xml.Name `xml:"device"` + DeviceType string `xml:"deviceType"` + DeviceList DeviceList `xml:"deviceList"` + ServiceList ServiceList `xml:"serviceList"` +} + +type Root struct { + Device Device +} + +func getChildDevice(d *Device, deviceType string) *Device { + dl := d.DeviceList.Device + for i := 0; i < len(dl); i++ { + if strings.Contains(dl[i].DeviceType, deviceType) { + return &dl[i] + } + } + return nil +} + +func getChildService(d *Device, serviceType string) *UPNPService { + sl := d.ServiceList.Service + for i := 0; i < len(sl); i++ { + if strings.Contains(sl[i].ServiceType, serviceType) { + return &sl[i] + } + } + return nil +} + +func localIPv4() (net.IP, error) { + tt, err := net.Interfaces() + if err != nil { + return nil, err + } + for _, t := range tt { + aa, err := t.Addrs() + if err != nil { + return nil, err + } + for _, a := range aa { + ipnet, ok := a.(*net.IPNet) + if !ok { + continue + } + v4 := ipnet.IP.To4() + if v4 == nil || v4[0] == 127 { // loopback address + continue + } + return v4, nil + } + } + return nil, errors.New("cannot find local IP address") +} + +func getServiceURL(rootURL string) (url, urnDomain string, err error) { + r, err := http.Get(rootURL) + if err != nil { + return + } + defer r.Body.Close() // nolint: errcheck + + if r.StatusCode >= 400 { + err = errors.New(fmt.Sprint(r.StatusCode)) + return + } + var root Root + err = xml.NewDecoder(r.Body).Decode(&root) + if err != nil { + return + } + a := &root.Device + if !strings.Contains(a.DeviceType, "InternetGatewayDevice:1") { + err = errors.New("No InternetGatewayDevice") + return + } + + b := getChildDevice(a, "WANDevice:1") + if b == nil { + err = errors.New("No WANDevice") + return + } + c := getChildDevice(b, "WANConnectionDevice:1") + if c == nil { + err = errors.New("No WANConnectionDevice") + return + } + d := getChildService(c, "WANIPConnection:1") + if d == nil { + // Some routers don't follow the UPnP spec, and put WanIPConnection under WanDevice, + // instead of under WanConnectionDevice + d = getChildService(b, "WANIPConnection:1") + + if d == nil { + d = getChildService(c, "WANPPPConnection:1") + if d == nil { + err = errors.New("No WANIPConnection or WANPPPConnection") + return + } + + } + } + // Extract the domain name, which isn't always 'schemas-upnp-org' + urnDomain = strings.Split(d.ServiceType, ":")[1] + url = combineURL(rootURL, d.ControlURL) + return +} + +func combineURL(rootURL, subURL string) string { + protocolEnd := "://" + protoEndIndex := strings.Index(rootURL, protocolEnd) + a := rootURL[protoEndIndex+len(protocolEnd):] + rootIndex := strings.Index(a, "/") + return rootURL[0:protoEndIndex+len(protocolEnd)+rootIndex] + subURL +} + +func soapRequest(url, function, message, domain string) (r *http.Response, err error) { + fullMessage := "" + + "\r\n" + + "" + message + "" + + req, err := http.NewRequest("POST", url, strings.NewReader(fullMessage)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "text/xml ; charset=\"utf-8\"") + req.Header.Set("User-Agent", "Darwin/10.0.0, UPnP/1.0, MiniUPnPc/1.3") + //req.Header.Set("Transfer-Encoding", "chunked") + req.Header.Set("SOAPAction", "\"urn:"+domain+":service:WANIPConnection:1#"+function+"\"") + req.Header.Set("Connection", "Close") + req.Header.Set("Cache-Control", "no-cache") + req.Header.Set("Pragma", "no-cache") + + // log.Stderr("soapRequest ", req) + + r, err = http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + /*if r.Body != nil { + defer r.Body.Close() + }*/ + + if r.StatusCode >= 400 { + // log.Stderr(function, r.StatusCode) + err = errors.New("Error " + strconv.Itoa(r.StatusCode) + " for " + function) + r = nil + return + } + return +} + +type statusInfo struct { + externalIpAddress string +} + +func (n *upnpNAT) getExternalIPAddress() (info statusInfo, err error) { + + message := "\r\n" + + "" + + var response *http.Response + response, err = soapRequest(n.serviceURL, "GetExternalIPAddress", message, n.urnDomain) + if response != nil { + defer response.Body.Close() // nolint: errcheck + } + if err != nil { + return + } + + var envelope Envelope + data, err := ioutil.ReadAll(response.Body) + if err != nil { + return + } + + reader := bytes.NewReader(data) + err = xml.NewDecoder(reader).Decode(&envelope) + if err != nil { + return + } + + info = statusInfo{envelope.Soap.ExternalIP.IPAddress} + + if err != nil { + return + } + + return +} + +// GetExternalAddress returns an external IP. If GetExternalIPAddress action +// fails or IP returned is invalid, GetExternalAddress returns an error. +func (n *upnpNAT) GetExternalAddress() (addr net.IP, err error) { + info, err := n.getExternalIPAddress() + if err != nil { + return + } + addr = net.ParseIP(info.externalIpAddress) + if addr == nil { + err = fmt.Errorf("Failed to parse IP: %v", info.externalIpAddress) + } + + return +} + +func (n *upnpNAT) AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error) { + // A single concatenation would break ARM compilation. + message := "\r\n" + + "" + strconv.Itoa(externalPort) + message += "" + protocol + "" + message += "" + strconv.Itoa(internalPort) + "" + + "" + n.ourIP + "" + + "1" + message += description + + "" + strconv.Itoa(timeout) + + "" + + var response *http.Response + response, err = soapRequest(n.serviceURL, "AddPortMapping", message, n.urnDomain) + if response != nil { + defer response.Body.Close() // nolint: errcheck + } + if err != nil { + return + } + + // TODO: check response to see if the port was forwarded + // log.Println(message, response) + // JAE: + // body, err := ioutil.ReadAll(response.Body) + // fmt.Println(string(body), err) + mappedExternalPort = externalPort + _ = response + return +} + +func (n *upnpNAT) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) { + + message := "\r\n" + + "" + strconv.Itoa(externalPort) + + "" + protocol + "" + + "" + + var response *http.Response + response, err = soapRequest(n.serviceURL, "DeletePortMapping", message, n.urnDomain) + if response != nil { + defer response.Body.Close() // nolint: errcheck + } + if err != nil { + return + } + + // TODO: check response to see if the port was deleted + // log.Println(message, response) + _ = response + return +} diff --git a/util_darwin.go b/util_darwin.go new file mode 100644 index 0000000..59208ee --- /dev/null +++ b/util_darwin.go @@ -0,0 +1,32 @@ +package main + +import ( + "strings" + "syscall" +) + +const ( + defaultInstallPath = "/usr/local/openp2p" + defaultBinName = "openp2p" +) + +func getOsName() (osName string) { + output := execOutput("sw_vers", "-productVersion") + osName = "Mac OS X " + strings.TrimSpace(output) + return +} + +func setRLimit() error { + var limit syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil { + return err + } + limit.Cur = 10240 + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil { + return err + } + return nil +} + +func setFirewall() { +} diff --git a/util_linux.go b/util_linux.go new file mode 100644 index 0000000..b616ecc --- /dev/null +++ b/util_linux.go @@ -0,0 +1,72 @@ +package main + +import ( + "bufio" + "bytes" + "io/ioutil" + "os" + "strings" + "syscall" +) + +const ( + defaultInstallPath = "/usr/local/openp2p" + defaultBinName = "openp2p" +) + +func getOsName() (osName string) { + var sysnamePath string + sysnamePath = "/etc/redhat-release" + _, err := os.Stat(sysnamePath) + if err != nil && os.IsNotExist(err) { + str := "PRETTY_NAME=" + f, err := os.Open("/etc/os-release") + if err != nil && os.IsNotExist(err) { + str = "DISTRIB_ID=" + f, err = os.Open("/etc/openwrt_release") + } + if err == nil { + buf := bufio.NewReader(f) + for { + line, err := buf.ReadString('\n') + if err == nil { + line = strings.TrimSpace(line) + pos := strings.Count(line, str) + if pos > 0 { + len1 := len([]rune(str)) + 1 + rs := []rune(line) + osName = string(rs[len1 : (len(rs))-1]) + break + } + } else { + break + } + } + } + } else { + buff, err := ioutil.ReadFile(sysnamePath) + if err == nil { + osName = string(bytes.TrimSpace(buff)) + } + } + if osName == "" { + osName = "Linux" + } + return +} + +func setRLimit() error { + var limit syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil { + return err + } + limit.Max = 1024 * 1024 + limit.Cur = limit.Max + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil { + return err + } + return nil +} + +func setFirewall() { +} diff --git a/util_windows.go b/util_windows.go new file mode 100644 index 0000000..e11484f --- /dev/null +++ b/util_windows.go @@ -0,0 +1,53 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + + "golang.org/x/sys/windows/registry" +) + +const ( + defaultInstallPath = "C:\\Program Files\\OpenP2P" + defaultBinName = "openp2p.exe" +) + +func getOsName() (osName string) { + k, err := registry.OpenKey(registry.LOCAL_MACHINE, `SOFTWARE\Microsoft\Windows NT\CurrentVersion`, registry.QUERY_VALUE|registry.WOW64_64KEY) + if err != nil { + return + } + defer k.Close() + pn, _, err := k.GetStringValue("ProductName") + if err == nil { + osName = pn + } + return +} + +func setRLimit() error { + return nil +} + +func setFirewall() { + fullPath, err := filepath.Abs(os.Args[0]) + if err != nil { + gLog.Println(LvERROR, "add firewall error:", err) + return + } + isXP := false + osName := getOsName() + if strings.Contains(osName, "XP") || strings.Contains(osName, "2003") { + isXP = true + } + if isXP { + exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall del allowedprogram "%s"`, fullPath)).Run() + exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh firewall add allowedprogram "%s" "%s" ENABLE`, ProducnName, fullPath)).Run() + } else { // win7 or later + exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall del rule name="%s"`, ProducnName)).Run() + exec.Command("cmd.exe", `/c`, fmt.Sprintf(`netsh advfirewall firewall add rule name="%s" dir=in action=allow program="%s" enable=yes`, ProducnName, fullPath)).Run() + } +}