`

记录golang及nginx-lua的tcp通讯代码

 
阅读更多

golang代码如下(src/simple/main.go)

package main

import (
	"log"
	"net"
	"strconv"
	"strings"
	"sync"
	"time"
)

const (
	msg_length = 10240
)

type Control struct {
	conn  net.TCPConn
	mutex sync.Mutex
}

//store request map
var requestMap map[string]*Control

func proxy(conn *net.TCPConn) {

	defer conn.Close()

	var data = make([]byte, 1024)
	for {
		_, err := conn.Read(data)
		if err != nil {
			log.Println("proxy disconnect from " + conn.RemoteAddr().String())
			return
		}
		var commands = strings.Split(string(data), " ")
		if len(commands) <= 1 || commands[1] == "" {
			log.Println("proxy receive bad commands : " + string(data))
			return
		}
		var id = commands[0]

		var n, tn int
		if rclient, ok := requestMap[id]; ok {
			log.Println("client " + commands[0] + " do command " + commands[1])
			_, err = rclient.conn.Write([]byte(commands[1] + "\r\n"))
			if err != nil {
				rclient.conn.Close()
				delete(requestMap, id)
				break
			}

			var msglen = 0
			var msg []string
			var tdata = make([]byte, 2048)
			var rdata []byte
			tn = 0
			for {
				rclient.mutex.Lock()
				rclient.conn.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
				n, err = rclient.conn.Read(tdata)
				rclient.mutex.Unlock()
				if err != nil {
					if strings.HasSuffix(err.Error(), "timeout") == false {
						rclient.conn.Close()
						delete(requestMap, id)
						break
					}
				}
				if n > 0 {
					if tn == 0 && n > 0 {
						msg = strings.Split(string(tdata[:n]), "_")
						msglen, err = strconv.Atoi(msg[0])
						if err != nil {
							break
						}
						rdata = append(rdata, tdata[len(msg[0])+1:n]...)

					} else {
						rdata = append(rdata, tdata[0:n]...)
					}
					tn = tn + n
					if tn >= msglen+len(msg[0])+1 {
						break
					}
				}
			}
			conn.Write(rdata[:msglen])
		} else {
			log.Println("client is offline " + id)
			conn.Write([]byte("offline"))
		}

		break
	}
}

//
func TcpMain(ip string, port int, router bool) {
	//start tcp server
	listen, err := net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP(ip), port, ""})
	if err != nil {
		log.Fatalln("listen port error")
		return
	}
	log.Println("start tcp server " + ip + " " + strconv.Itoa(port))
	defer listen.Close()

	//listen new request
	var id string
	for {
		conn, err := listen.AcceptTCP()
		if err != nil {
			log.Println("receive connection failed")
			continue
		}
		if router == true {

			var tdata = make([]byte, 256)
			conn.SetReadDeadline(time.Now().Add(3 * time.Second))
			var n, err = conn.Read(tdata)
			if err != nil {
				conn.Close()
				continue
			}
			id = string(tdata[0:n])

			if rcontrol, ok := requestMap[id]; ok {
				rcontrol.mutex.Lock()
				rcontrol.conn.Close()
				rcontrol.mutex.Unlock()
				delete(requestMap, id)
			}
			log.Println("connected from " + id + " addr " + conn.RemoteAddr().String())
			var control = &Control{}
			control.conn = *conn
			requestMap[id] = control
		} else {
			go proxy(conn)
		}

	}
}

func main() {
	requestMap = make(map[string]*Control)

	go TcpMain("127.0.0.1", 1988, false)
	TcpMain("0.0.0.0", 1987, true)
}

 

nginx-lua配置如下(其中lua-resty-http使用这个版本

lua_package_path "/usr/local/openresty/nginx/conf/lua-resty-http-master/lib/?.lua;;";
init_worker_by_lua_file /usr/local/openresty/nginx/conf/connect.lua;
lua_socket_keepalive_timeout 600;

conf/connect.lua代码如下

local function proxy()
        local shutdown = 0

        while true do
                if ngx.worker.exiting() then
                        break
                end

                local tcpsocket = ngx.socket.tcp()
                local ok, err = tcpsocket:connect("127.0.0.1", 1987)
                if not ok then
                        ngx.timer.at(5,proxy)
                        break
                else
                        tcpsocket:send("0")
                        
                        while true do
                                if ngx.worker.exiting() then
                                        shutdown = 1
                                        break
                                end
                                local result = ""
                                tcpsocket:settimeout(600000)
                                local line, err, partial = tcpsocket:receive()
                                if not line then
                                        tcpsocket:close()
                                        break
                                else
                                        line = string.gsub(line, "^%s*(.-)%s*$", "%1")

                                        local http = require("resty.http")
                                        local httpc = http.new()
                                        local res, err = httpc:request_uri("http://www.ciaos.com"..line, {
                                                method = "POST",
                                                body = "username=root",
                                                headers = {
                                                        ["Content-Type"] = "application/x-www-form-urlencoded",
                                                }
                                        })

                                        if not res then
                                                tcpsocket.send("0_err")
                                        else                                           
                                                local len = string.len(res.body)
                                                tcpsocket:send(len .. "_" .. res.body)
                                        end  
                                end          
                        end                                 
                                                    
                        if shutdown == 1 then
                                break  
                        end                      
                end                                         
        end                                                                   
end                                             
                                                         
local ok, err = ngx.timer.at(0,proxy)        
if not ok then                         
        ngx.log(ngx.ERR, "timer error")                                               
end 

php测试代码

public function console()
{
        $command= $this->input->get("cmd");
        if(is_null($command) or $command == false){
                echo "cmd not found";
                return;
        }
        $port = 1988;
        $ip = "127.0.0.1";
        $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
        if ($socket < 0) {
                echo "socket_create() failed: reason: " . socket_strerror($socket) . "\n";
                return;
        }

        $result = socket_connect($socket, $ip, $port);
        if ($result < 0) {
                echo "socket_connect() failed.\nReason: ($result) " . socket_strerror($result) . "\n";
                return;
        }

        $in = "0 $command end";
        if(!socket_write($socket, $in, strlen($in))) {
                echo "socket_write() failed: reason: " . socket_strerror($socket) . "\n";
        }else {
        }

        $out = socket_read($socket, 8192);
        echo $out;
        socket_close($socket);
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics