在网络分层的七层协议中,我们知道TCP在HTTP层之下。本质上HTTP 包的解析是基于底层 TCP 连接建立的。
TCP Connection Distinct:计算机之间建立网络连接,也称为TCP握手,本质上是两个文件句柄的关联,即fd,每个网络连接由四个属性唯一标识:
操作系统套接字:操作系统套接字用作在服务器端和客户端程序之间建立双向网络通信链接的端点。
在 Linux 机器上,可以通过以下网络参数设置 TCP keepalive 机制:
# cat /proc/sys/net/ipv4/tcp_keepalive_time
7200
# cat /proc/sys/net/ipv4/tcp_keepalive_intvl
75
# cat /proc/sys/net/ipv4/tcp_keepalive_probes
9以上是默认设置,这意味着初始连接创建将在两小时(7200 秒)后每 75 秒重新发送一次。如果连续 9 次没有收到 ACK 响应,则连接被标记为断开。
在 Go 原生的 net 包中,可以使用 TCP 连接的 keep-alive 机制设置以下功能:
接下来,我们先用代码演示一个TCP连接demo进行交互
TCP传输的结构体
我们为客户端和服务端的交互定义了两种结构,传输协议用json演示
type Message struct {
Uid string
Val string
}
type Resp struct {
Uid string
Val string
Ts string
}const TAG = "server: hello, "
func transfer(conn net.Conn) {
defer func() {
remoteAddr := conn.RemoteAddr().String()
log.Print("discard remove add:", remoteAddr)
conn.Close()
}()
// set 10 seconds to close the connection
//conn.SetDeadline(time.Now().Add(10 * time.Second))
for {
var msg body.Message
if err := json.NewDecoder(conn).Decode(&msg); err != nil && err != io.EOF {
log.Printf("Decode from client err: %v", err)
return
}
if msg.Uid != "" || msg.Val != "" {
//conn.Write([]byte(msg.Val))
var rsp body.Resp
rsp.Uid = msg.Uid
rsp.Val = TAG + msg.Val
ser, _ := json.Marshal(msg)
conn.Write(append(ser, '
'))
}
}
}
func ListenAndServer() {
log.Print("Start server...")
// start listening on local tcp port 3000
listen, err := net.Listen("tcp", "0.0.0.0:3000")
if err != nil {
log.Fatal("Listen failed. msg: ", err)
return
}
for {
conn, err := listen.Accept()
if err != nil {
log.Printf("accept failed, err: %v", err)
continue
}
go transfer(conn)
}
}type IConn interface {
Close() error
}
// for each connection
type Conn struct {
addr string
tcp *net.TCPConn // tcp connection can be any types implement of database(Redis,MySQL,Kafka)
ctx context.Context
writer *bufio.Writer
cnlFun context.CancelFunc // tsed to notify the end of ctx
retChan *sync.Map // the map that stores the channel result set, which belongs to the unified connection
err error
}
// Implement the Close() function signature for Conn to close the connection, close the message channel
func (c *Conn) Close() (err error) {
if c.cnlFun != nil {
c.cnlFun()
}
if c.tcp != nil {
err = c.tcp.Close()
}
if c.retChan != nil {
c.retChan.Range(func(key, value interface{}) bool {
// convert the channel type according to the specific business assertion
if ch, ok := value.(chan string); ok {
close(ch)
}
return true
})
}
return
}type Option struct {
addr string
size int
readTimeout
time.Duration dialTimeout time.Duration
keepAlive time.Duration
}
func NewConn(opt *Option) (c *Conn, err error) {
// initialize connection
c = &Conn{
addr: opt.addr,
retChan: new(sync.Map),
//err: nil,
}
defer func() {
if err != nil {
if c != nil {
c.Close()
}
}
}()
// dial
var conn net.Conn
if conn, err = net.DialTimeout("tcp", opt.addr, opt.dialTimeout); err != nil {
return
} else {
c.tcp = conn.(*net.TCPConn)
}
c.writer = bufio.NewWriter(c.tcp)
//if err = c.tcp.SetKeepAlive(true); err != nil {
if err = c.tcp.SetKeepAlive(false); err != nil {
return
}
if err = c.tcp.SetKeepAlivePeriod(opt.keepAlive); err != nil {
return
}
if err = c.tcp.SetLinger(0); err != nil {
return
}
// create context management
c.ctx, c.cnlFun = context.WithCancel(context.Background())
// receive results asynchronously to the corresponding result set
go receiveResp(c)
return
}异步接收结果
主要进行异步轮询的receiveResp()函数有几个功能:
// receive the data from tcp connection
func receiveResp(c *Conn) {
scanner := bufio.NewScanner(c.tcp)
for {
select {
case <-c.ctx.Done():
// c.cnlFun() is executed, if the connection pool is closed
return
default:
if scanner.Scan() {
rsp := new(body.Resp)
if err := json.Unmarshal(scanner.Bytes(), rsp); err != nil {
return
}
// the response id corresponds to the request id
uid := rsp.Uid
if load, ok := c.retChan.Load(uid); ok {
c.retChan.Delete(uid)
// message channel
if ch, ok := load.(chan string); ok {
ch <- rsp.Ts + ": " + rsp.Val
// close on write side
close(ch)
}
}
} else {
if scanner.Err() != nil {
c.err = scanner.Err()
} else {
c.err = errors.New("scanner done")
}
c.Close()
return
}
}
}
}
func (c *Conn) Send(ctx context.Context, msg *body.Message) (ch chan string, err error) {
ch = make(chan string)
c.retChan.Store(msg.Uid, ch)
js, _ := json.Marshal(msg)
_, err = c.writer.Write(js)
if err != nil {
return
}
err = c.writer.Flush()
// 连接没有关闭,可以稍后放入连接池
//c.tcp.CloseWrite()
return
}=== RUN TestListenAndServer
2022/08/25 16:58:20 启动服务器...var OPT = &Option{
addr: "0.0.0.0:3000",
size: 3,
readTimeout: 3 * time.Second,
dialTimeout: 3 * time.Second,
keepAlive: 1 * time.Second,
}
func createConn(opt *Option) *Conn {
c, err := NewConn(opt)
if err != nil {
panic(err)
}
return c
}
func TestSendMsg(t *testing.T) {
c := createConn(OPT)
msg := &body.Message{Uid: "pixel-1", Val: "pixelpig!"}
rec, err := c.Send(context.Background(), msg)
if err != nil {
t.Error(err)
} else {
t.Logf("rec1: %+v", <-rec)
}
msg.Val = "another pig!"
rec2, err := c.Send(context.Background(), msg)
if err != nil {
t.Error(err)
} else {
t.Logf("rec2: %+v", <-rec2)
}
t.Log("finished")
}=== RUN TestSendMsg
TestSendMsg: conn_test.go:56: rec1: : pixelpig!
TestSendMsg: conn_test.go:64: rec2: : another pig!
TestSendMsg: conn_test.go:66: finished
--- PASS: TestSendMsg (9.94s)
PASS以上是比较简单的点对点交互。其实连接交互超时也可以在后面考虑:
超时判断
判断超时的方法有很多种,比较常见的一种是使用select{}块和time.After()。我们来看看常见的实现:
rec3, err := c.Send(context.Background(), msg)
if err == nil {
select {
case resp := <-rec3:
t.Logf("rec3: %+v", resp)
return
case <-time.After(time.Second * 1):
t.Error("Wait for resp timeout!")
return
}
} else {
t.Error(err)
}=== RUN TestSendMsg
TestSendMsg: conn_test.go:56: rec1: : pixelpig!
TestSendMsg: conn_test.go:76: Wait for resp timeout!
--- FAIL: TestSendMsg (17.99s)
FAIL连接池要考虑的情况会复杂一点,后面再写一篇文章来说。
| 留言与评论(共有 0 条评论) “” |