谈谈 Go:网络编程之TCP通信

在网络分层的七层协议中,我们知道TCP在HTTP层之下。本质上HTTP 包的解析是基于底层 TCP 连接建立的。

TCP 协议握手

TCP Connection Distinct:计算机之间建立网络连接,也称为TCP握手,本质上是两个文件句柄的关联,即fd,每个网络连接由四个属性唯一标识:,所以一台机器的连接数为受文件句柄限制ulimit

操作系统套接字:操作系统套接字用作在服务器端和客户端程序之间建立双向网络通信链接的端点。


KeepAlive在不同场景下的解释

  • HTTP keepalive:
    众所周知,HTTP 连接是无状态的。通常,连接用完时会被破坏。开启keepalive可以告诉它保持连接一段时间,避免频繁地连接重建。
  • TCP存活:许多现有的 TCP 协议通过定义某种心跳机制来支持这种错误处理方式,该机制要求每个端点定期发送 PING/PONG 探测,以检测网络问题以及服务健康状况。


Linux 网络参数

在 Linux 机器上,可以通过以下网络参数设置 TCP keepalive 机制:

 # cat /proc/sys/net/ipv4/tcp_keepalive_time
  7200

  # cat /proc/sys/net/ipv4/tcp_keepalive_intvl
  75

  # cat /proc/sys/net/ipv4/tcp_keepalive_probes
  9

以上是默认设置,这意味着初始连接创建将在两小时(7200 秒)后每 75 秒重新发送一次。如果连续 9 次没有收到 ACK 响应,则连接被标记为断开。


Go TCP连接

在 Go 原生的 net 包中,可以使用 TCP 连接的 keep-alive 机制设置以下功能:

  • func (c *TCPConn) SetKeepAlive(keepalive bool) error
    是否开启连接检测
  • func (c *TCPConn) SetKeepAlivePeriod(d time.Duration) error
    连接检测间隔,默认会使用操作系统参数设置


接下来,我们先用代码演示一个TCP连接demo进行交互

TCP传输的结构体
我们为客户端和服务端的交互定义了两种结构,传输协议用
json演示

type Message struct {
	Uid string
	Val string
}
type Resp struct {
	Uid string
	Val string
	Ts string
}


服务端代码

const TAG = "server: hello, "

func transfer(conn net.Conn) {
	defer func() {
		remoteAddr := conn.RemoteAddr().String()
		log.Print("discard remove add:", remoteAddr)
		conn.Close()
	}()

	// set 10 seconds to close the connection
	//conn.SetDeadline(time.Now().Add(10 * time.Second))

	for {
		var msg body.Message

		if err := json.NewDecoder(conn).Decode(&msg); err != nil && err != io.EOF {
			log.Printf("Decode from client err: %v", err)
			return
		}

		if msg.Uid != "" || msg.Val != "" {
			//conn.Write([]byte(msg.Val))
			var rsp body.Resp
			rsp.Uid = msg.Uid
			rsp.Val = TAG + msg.Val
			ser, _ := json.Marshal(msg)

			conn.Write(append(ser, '
'))
		}
	}
}

func ListenAndServer() {
	log.Print("Start server...")
	// start listening on local tcp port 3000
	listen, err := net.Listen("tcp", "0.0.0.0:3000")
	if err != nil {
		log.Fatal("Listen failed. msg: ", err)
		return
	}
	for {
		conn, err := listen.Accept()
		if err != nil {
			log.Printf("accept failed, err: %v", err)
			continue
		}
		go transfer(conn)
	}
}

客户端代码

type IConn interface {
	Close() error
}

// for each connection
type Conn struct {
	addr    string              
	tcp     *net.TCPConn        // tcp connection can be any types implement of database(Redis,MySQL,Kafka)
	ctx     context.Context
	writer  *bufio.Writer
	cnlFun  context.CancelFunc // tsed to notify the end of ctx
	retChan *sync.Map          // the map that stores the channel result set, which belongs to the unified connection
	err     error
}

// Implement the Close() function signature for Conn to close the connection, close the message channel
func (c *Conn) Close() (err error) {
	if c.cnlFun != nil {
		c.cnlFun()
	}
	
	if c.tcp != nil {
		err = c.tcp.Close()
	}
	if c.retChan != nil {
		c.retChan.Range(func(key, value interface{}) bool {
			// convert the channel type according to the specific business assertion
			if ch, ok := value.(chan string); ok {
				close(ch)
			}
			return true
		})
	}
	return
}

定义TCP连接代码

type Option struct { 
	addr string 
	size int 
	readTimeout 
	time.Duration dialTimeout time.Duration 
	keepAlive time.Duration 
}

func NewConn(opt *Option) (c *Conn, err error) {
	// initialize connection
	c = &Conn{
		addr:    opt.addr,
		retChan: new(sync.Map),
		//err: nil,
	}

	defer func() {
		if err != nil {
			if c != nil {
				c.Close()
			}
		}
	}()

	// dial
	var conn net.Conn
	if conn, err = net.DialTimeout("tcp", opt.addr, opt.dialTimeout); err != nil {
		return
	} else {
		c.tcp = conn.(*net.TCPConn)
	}

	c.writer = bufio.NewWriter(c.tcp)

	//if err = c.tcp.SetKeepAlive(true); err != nil {
	if err = c.tcp.SetKeepAlive(false); err != nil {
		return
	}
	if err = c.tcp.SetKeepAlivePeriod(opt.keepAlive); err != nil {
	  return
	}
	if err = c.tcp.SetLinger(0); err != nil {
		return
	}

    // create context management
	c.ctx, c.cnlFun = context.WithCancel(context.Background())

	// receive results asynchronously to the corresponding result set
	go receiveResp(c)

	return
}

异步接收结果
主要进行异步轮询的
receiveResp()函数有几个功能:

  • 意识到上下文关闭,通常执行连接的 cancel()
  • 从服务器接收数据并写入结果通道retChan,其类型是并发安全的sync.Map
  • 监听服务器错误并关闭连接以发现异常
// receive the data from tcp connection
func receiveResp(c *Conn) {
	scanner := bufio.NewScanner(c.tcp)
	for {
		select {
		case <-c.ctx.Done():
			// c.cnlFun() is executed, if the connection pool is closed
			return
		default:
			if scanner.Scan() {
				rsp := new(body.Resp)
				if err := json.Unmarshal(scanner.Bytes(), rsp); err != nil {
					return
				}
				// the response id corresponds to the request id
				uid := rsp.Uid
				if load, ok := c.retChan.Load(uid); ok {
					c.retChan.Delete(uid)
					// message channel
					if ch, ok := load.(chan string); ok {
						ch <- rsp.Ts + ": " + rsp.Val
						// close on write side
						close(ch)
					}
				}
			} else {
				if scanner.Err() != nil {
					c.err = scanner.Err()
				} else {
					c.err = errors.New("scanner done")
				}
				c.Close()
				return
			}
		}
	}
}

func (c *Conn) Send(ctx context.Context, msg *body.Message) (ch chan string, err error) { 
	ch = make(chan string) 
	c.retChan.Store(msg.Uid, ch) 
	js, _ := json.Marshal(msg)
	_, err = c.writer.Write(js) 
	if err != nil { 
		return 
	}
	err = c.writer.Flush() 
	// 连接没有关闭,可以稍后放入连接池
	//c.tcp.CloseWrite() 
	return 
}


运行步骤

  • 启动服务器端监听:
=== RUN TestListenAndServer 
2022/08/25 16:58:20 启动服务器...
  • 测试函数代码
var OPT = &Option{
	addr:        "0.0.0.0:3000",
	size:        3,
	readTimeout: 3 * time.Second,
	dialTimeout: 3 * time.Second,
	keepAlive:   1 * time.Second,
}
func createConn(opt *Option) *Conn {
	c, err := NewConn(opt)
	if err != nil {
		panic(err)
	}
	return c
}
func TestSendMsg(t *testing.T) {
	c := createConn(OPT)
	msg := &body.Message{Uid: "pixel-1", Val: "pixelpig!"}
	rec, err := c.Send(context.Background(), msg)
	if err != nil {
		t.Error(err)
	} else {
		t.Logf("rec1: %+v", <-rec)
	}
	msg.Val = "another pig!"
	rec2, err := c.Send(context.Background(), msg)
	if err != nil {
		t.Error(err)
	} else {
		t.Logf("rec2: %+v", <-rec2)
	}
	t.Log("finished")
}
  • 客户端输出
=== RUN   TestSendMsg
    TestSendMsg: conn_test.go:56: rec1: : pixelpig!
    TestSendMsg: conn_test.go:64: rec2: : another pig!
    TestSendMsg: conn_test.go:66: finished
--- PASS: TestSendMsg (9.94s)
PASS

超时和连接池

以上是比较简单的点对点交互。其实连接交互超时也可以在后面考虑:

  1. 虽然连接结果是异步响应,但我们还是需要让响应超时,防止单个连接继续阻塞。
  2. 我们需要考虑复用,正常的连接放入连接池进行管理。

超时判断
判断超时的方法有很多种,比较常见的一种是使用
select{}块和time.After()。我们来看看常见的实现:

rec3, err := c.Send(context.Background(), msg)
if err == nil {
	select {
	case resp := <-rec3:
		t.Logf("rec3: %+v", resp)
		return
	case <-time.After(time.Second * 1):
		t.Error("Wait for resp timeout!")
		return
	}
} else {
	t.Error(err)
}
=== RUN   TestSendMsg
    TestSendMsg: conn_test.go:56: rec1: : pixelpig!
    TestSendMsg: conn_test.go:76: Wait for resp timeout!
--- FAIL: TestSendMsg (17.99s)
FAIL

连接池要考虑的情况会复杂一点,后面再写一篇文章来说。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章