kite 源码 -- kite

kite.go 源码大概有 200+ 行

代码组成

kite.go 包含以下公共接口和模块内部成员

  • SetIDL
  • NewAccessLogger
  • NewCallLogger
  • GetLocalIp
  • Init
  • Run
  • WaitSignal
  • MethodContext
  • DefineProcessor
  • Use
  • KiteMW
  • AddMethodMW

内部成员

  • userMW endpoint.Middleware
  • mMap map[string]endpoint.Middleware
  • RPCServer *RPCServer
  • IDLs map[string]string
  • Processor thrfit.TProcessor
  • KiteVersion const string

Init

代码逻辑

  • 如果存在环境变量 GOMAXPROCSMY_CPU_LIMIT 设置 runtime.GOMAXPROCS
    runtime.GOMAXPROCS 为 go 程序所能占用的最大核数,也可以理解为线程数
  • 读取配置,配置优先级
    配置文件 > 环境变量 > 参数
  • 初始化 init Data bus config
  • 初始化 access logger
  • 初始化 kitc 的 call logger
  • 创建 RPCServer 的实例

Run

源码

1
2
3
4
5
6
7
func Run() error {
errch := make(chan error, 1)
go func () {
errch <- server
}()
return waitSignal(errch)
}

逻辑

  • 启动一个 goroutine, 在其中启动 RPCServer.Serve() 服务
  • 等待系统中断信号并退出

waitSignal

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func waitSignal(errCh chan error) error {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM)

defer trace.Close()
WaitSignal:
for {
select {
case sig := <- signals:
switch sig {
case syscall.SIGTERM:
StopRegister()
return error.New(sig.String())
case syscall.SIGHUP, syscall.SIGINT:
StopRegister()
RPCServer.Stop()
break WaitSignal
}
case err := <- errch:
StopRegister()
return err
}
}
err := <- errch
if err != nil {
logs.Fatal("KITE: AcceptLoop error: %s\n", err)
}
return err
}

代码逻辑

  • 设置 os 的中断信号由 signals 通道接受
  • 等待系统的中断并处理