Golang 后端微服务体系架构

解析组件

  • Rpcx(解析中……)
  • Appdash(待解析)
  • NSQ(待解析)
  • Hystrix-go(待解析)

Rpcx[RPC通信]

服务端

ServerPlugin

服务插件,用以很多服务启动的时候赋予额外的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//PluginContainer represents a plugin container that defines all methods to manage plugins.
//And it also defines all extension points.
type PluginContainer interface {
Add(plugin Plugin)
Remove(plugin Plugin)
All() []Plugin

DoRegister(name string, rcvr interface{}, metadata string) error
DoRegisterFunction(name string, fn interface{}, metadata string) error

DoPostConnAccept(net.Conn) (net.Conn, bool)

DoPreReadRequest(ctx context.Context) error
DoPostReadRequest(ctx context.Context, r *protocol.Message, e error) error

DoPreWriteResponse(context.Context, *protocol.Message) error
DoPostWriteResponse(context.Context, *protocol.Message, *protocol.Message, error) error

DoPreWriteRequest(ctx context.Context) error
DoPostWriteRequest(ctx context.Context, r *protocol.Message, e error) error
}
  • Server#RegisterName -> Plugins.DoRegister
  • Server#Plugins.Add用户自己调用
  • Server#SendMessage -> Plugins.DoPreWriteRequest,DoPostWriteRequest
  • Server#serveListener -> Plugins.DoPostConnAccept
  • Server#serveConn -> Plugins.DoPreWriteResponse,DoPostWriteResponse
  • Server#readRequest -> Plugins.DoPreReadRequest,DoPostReadRequest

官方提供示例组件如下

  • ZookeeperRegisterPlugin
  • EtcdRegisterPlugin
  • ConsulRegisterPlugin
  • MetricsPlugin
  • TracePlugin

ZooKeeperRegisterPlugin

主要运用于Zk上的注册工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//Zookeeper Register Plugin 实现了Zookeeper注册  
type ZooKeeperRegisterPlugin struct {
// 服务地址例如:tcp@127.0.0.1:8972, quic@127.0.0.1:1234
ServiceAddress string
// zk地址
ZooKeeperServers []string
// Prcx Server 基础路径, 例如 com/example/rpcx
BasePath string
Metrics metrics.Registry
// 注册的服务
Services []string
metasLock sync.RWMutex
metas map[string]string
UpdateInterval time.Duration

Options *store.Config
kv store.Store
}
  • Start:设置当前server服务情况并set到zk上相关server_basepath
  • Register:直接将相关server服务set到zk的server_basepath下的znode
  • HandleConnAccept:每次服务调用则调整相关Metrics

Server#Serve[服务监听]

监听主函数入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (s *Server) Serve(network, address string) (err error) {
var ln net.Listener
ln, err = s.makeListener(network, address)
if err != nil {
return
}

if network == "http" {
s.serveByHTTP(ln, "")
return nil
}

// try to start gateway
ln = s.startGateway(network, ln)

return s.serveListener(ln)
}

此处startGateWay引出http相关服务以及tcp服务的路由功能

Http路由

1
2
3
4
5
6
7
8
9
10
func (s *Server) startHTTP1APIGateway(ln net.Listener) {
router := httprouter.New()
router.POST("/*servicePath", s.handleGatewayRequest)
router.GET("/*servicePath", s.handleGatewayRequest)
router.PUT("/*servicePath", s.handleGatewayRequest)

if err := http.Serve(ln, router); err != nil {
log.Errorf("error in gateway Serve: %s", err)
}
}

Tcp路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func (m *cMux) Serve() error {
var wg sync.WaitGroup

defer func() {
close(m.donec)
wg.Wait()

for _, sl := range m.sls {
close(sl.l.connc)
// Drain the connections enqueued for the listener.
for c := range sl.l.connc {
_ = c.Close()
}
}
}()

for {
c, err := m.root.Accept()
if err != nil {
if !m.handleErr(err) {
return err
}
continue
}

wg.Add(1)
go m.serve(c, m.donec, &wg)
}
}



func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()

muc := newMuxConn(c)
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Now().Add(m.readTimeout))
}
for _, sl := range m.sls {
for _, s := range sl.ss {
matched := s(muc.Conn, muc.startSniffing())
if matched {
muc.doneSniffing()
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Time{})
}
select {
case sl.l.connc <- muc:
case <-donec:
_ = c.Close()
}
return
}
}
}

_ = c.Close()
err := ErrNotMatched{c: c}
if !m.handleErr(err) {
_ = m.root.Close()
}
}




客户端

……

资料

国内经验