基础
grpc是基于http/2.0协议的, otelgrpc库中大量出现metadata可以简单理解成http请求中的header
核心文件
go.opentelemetry.io\contrib\instrumentation\google.golang.org\grpc\otelgrpc@v0.65.0\stats_handler.go
整个trace功能的核心都在这个文件里面.
删除了import和注释, 删除了无用函数. 用$1 $2代表参数1 参数2
// TagRPC 给每一次rpc调用附加信息
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
// 提取info.FullMethodName (/GreetService/SayHello)
// name是判断FullMethodName如果第一个字符是/就去掉第一个字符返回, 不是直接返回FullMethodName. 第二个返回是[]attribute.KeyValue{semconv.RPCMethod(name)}
// 路径: go.opentelemetry.io\contrib\instrumentation\google.golang.org\grpc\otelgrpc@v0.65.0\internal\parse.go:20
// name: GreetService/SayHello
// attrs: 有一个key为rpc.method的值
name, attrs := internal.ParseFullMethod(info.FullMethodName)
// attrs类型attribute.KeyValue类型, 一个key可以有多个value. 这里用semconv.RPCSystemNameGRPC再添加rpc.system.name的key
attrs = append(attrs, semconv.RPCSystemNameGRPC)
if record {
// 创建一个新的切片, 以避免错误识别到metrics所使用的相同attrs切片中
spanAttributes := make([]attribute.KeyValue, 0, len(attrs)+len(h.SpanAttributes))
spanAttributes = append(append(spanAttributes, attrs...), h.SpanAttributes...)
ctx, _ = h.tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(spanAttributes...),
)
}
// 1. inject内部用metadata获取$1 ctx的内容.
// 2. 把上面h.tracer.Start创建的spanid、traceid注入到propagators.
// 3. 以key为traceparent, value为00-{traceID}-{spanID}-01 格式来填入.
// 4. 用metadata再把md包装一下返回ctx
return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.Propagators)
}
先来总结一下TagRPC做了哪些事情吧
- 用pb里面定义的service名拼接调用的rpc函数名. 如果第一个是/就去掉赋值给name, 然后给attrs赋值一个key是rpc.method value是未去除/的name
- 给attrs添加key是rpc.system.name value是grpc
- 判断filter是否为空, 不为空用filter过滤一下info
- 创建一个新的spanAttrs避免和metrics冲突
- 创建一个子span
- metrics代码这里跳过...
- 把子span返回的ctx包裹上gctx, 然后提取内容注入给全局propagator(如果没有应当会新建一个)
- 结束, 执行HandleRPC
核心就是inject添加了tranceparent的metadata. 然后在服务端接收, 然后otel.GetTextMapPropagator().Extract() 提取传递来的span, 他会自动new成一个trace.SpanContext返回, 然后需要手动trace.SpanContextFromContext(ctx) 提取成span. 进而就可以使用tracer.Start() 开启一个子span.
客户端和服务端均需连接到otel. jaeger依靠traceID、parentID、spanID来创建一个trace. 你可以尝试给客户端的end前sleep一下, 会发现服务端的span已经到jaeger而没有客户端的span. sleep结束后jaeger自动合并两个span到一个trace.
tranceparent传递格式: 00-{traceID}-{spanID}-01
最后的01翻看Extract好像是采样有关的. 断点+翻代码+搜索搞了一天, 懒得再研究了, 相关内容太少了, 而且文档也不详细.
简单实现
服务端
package main
import (
pb "alitracing/client/proto"
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"log"
"net"
"time"
)
func initTracer(serviceName string) (*sdktrace.TracerProvider, error) {
exporter, err := jaeger.New(
jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint("http://127.0.0.1:14268/api/traces"),
),
)
if err != nil {
return nil, err
}
log.Printf("[%s] Jaeger 导出器初始化成功", serviceName)
// 创建资源属性
res, err := resource.New(
context.Background(),
resource.WithAttributes(
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String("1.0.0"),
attribute.String("environment", "dev"),
),
)
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithSyncer(exporter),
sdktrace.WithResource(res),
)
// 设置全局 TracerProvider 和 Propagator
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
log.Printf("[%s] OTel 初始化完成", serviceName)
return tp, nil
}
type metadataCarrier struct {
md metadata.MD
}
func (c *metadataCarrier) Get(key string) string {
values := c.md.Get(key)
if len(values) == 0 {
return ""
}
return values[0]
}
func (c *metadataCarrier) Set(key, value string) {
c.md.Set(key, value)
}
func (c *metadataCarrier) Keys() []string {
keys := make([]string, 0, len(c.md))
for k := range c.md {
keys = append(keys, k)
}
return keys
}
type server struct {
pb.UnimplementedGreetServiceServer
tracer trace.Tracer
}
func (s *server) SayHello(ctx context.Context, req *pb.SayHelloReq) (*pb.SayHelloResp, error) {
log.Println("[服务端] 收到请求,开始提取 Trace 上下文")
// 提取入站 Metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(nil)
log.Println("[服务端] 未提取到入站 Metadata,使用空 Metadata")
} else {
log.Printf("[服务端] 提取到入站 Metadata: %v", md)
}
// 从 Metadata 提取远程 SpanContext
carrier := &metadataCarrier{md: md}
ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)
// 验证提取到的 SpanContext
extractedSC := trace.SpanContextFromContext(ctx)
if extractedSC.IsValid() && extractedSC.IsRemote() {
log.Printf("[服务端] 成功提取远程 SpanContext: TraceID=%s, SpanID=%s", extractedSC.TraceID(), extractedSC.SpanID())
} else {
log.Println("[服务端] 未提取到有效的远程 SpanContext,将创建新的根 Span")
}
// 创建服务端 Span
ctx, span := s.tracer.Start(
ctx,
"Greeter/SayHello",
trace.WithAttributes(
semconv.RPCMethodKey.String("SayHello"),
semconv.RPCServiceKey.String("Greeter"),
attribute.String("request.name", req.Name),
),
)
defer func() {
span.End()
log.Printf("[服务端] Span 结束: TraceID=%s, SpanID=%s", span.SpanContext().TraceID(), span.SpanContext().SpanID())
}()
time.Sleep(time.Second)
// 业务逻辑
return &pb.SayHelloResp{Msg: "Hello " + req.Name}, nil
}
func main() {
tp, err := initTracer("grpc-server")
if err != nil {
log.Fatalf("初始化 OTel 失败: %v", err)
}
defer func() { _ = tp.Shutdown(context.Background()) }()
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("监听失败: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreetServiceServer(s, &server{tracer: otel.Tracer("grpc-server-tracer")})
log.Println("服务端启动在 :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("服务启动失败: %v", err)
}
time.Sleep(time.Hour)
}
客户端
package main
import (
pb "alitracing/client/proto"
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"log"
"time"
)
func initTracer(serviceName string) (*sdktrace.TracerProvider, error) {
exporter, err := jaeger.New(
jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint("http://127.0.0.1:14268/api/traces"),
),
)
if err != nil {
return nil, err
}
log.Printf("[%s] Jaeger 导出器初始化成功", serviceName)
// 创建资源属性
res, err := resource.New(
context.Background(),
resource.WithAttributes(
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String("1.0.0"),
attribute.String("environment", "dev"),
),
)
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithSyncer(exporter),
sdktrace.WithResource(res),
)
// 设置全局 TracerProvider 和 Propagator
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
log.Printf("[%s] OTel 初始化完成", serviceName)
return tp, nil
}
type metadataCarrier struct {
md metadata.MD
}
func (c *metadataCarrier) Get(key string) string {
values := c.md.Get(key)
if len(values) == 0 {
return ""
}
return values[0]
}
func (c *metadataCarrier) Set(key, value string) {
c.md.Set(key, value)
}
func (c *metadataCarrier) Keys() []string {
keys := make([]string, 0, len(c.md))
for k := range c.md {
keys = append(keys, k)
}
return keys
}
func main() {
time.Sleep(1 * time.Second)
tp, err := initTracer("grpc-client")
if err != nil {
log.Fatalf("初始化 OTel 失败: %v", err)
}
defer func() {
// 确保 TracerProvider 关闭,Flush 剩余 Span
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
log.Printf("关闭 TracerProvider 失败: %v", err)
}
log.Println("[客户端] TracerProvider 已关闭")
}()
tracer := otel.Tracer("grpc-client-tracer")
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
client := pb.NewGreetServiceClient(conn)
// 创建客户端根 Span
ctx, span := tracer.Start(
context.Background(),
"ClientCall",
trace.WithAttributes(
semconv.RPCMethodKey.String("SayHello"),
semconv.RPCServiceKey.String("Greeter"),
),
)
defer func() {
span.End()
log.Printf("[客户端] Span 结束: TraceID=%s, SpanID=%s", span.SpanContext().TraceID(), span.SpanContext().SpanID())
}()
log.Printf("[客户端] 创建客户端 Span: TraceID=%s, SpanID=%s", span.SpanContext().TraceID(), span.SpanContext().SpanID())
// 注入 SpanContext 到 Metadata
md := metadata.New(nil)
carrier := &metadataCarrier{md: md}
otel.GetTextMapPropagator().Inject(ctx, carrier)
log.Printf("[客户端] 注入 Trace 上下文到 Metadata: %v", md)
// 发起 RPC
ctx = metadata.NewOutgoingContext(ctx, md)
resp, err := client.SayHello(ctx, &pb.SayHelloReq{Name: "Alice"})
if err != nil {
log.Fatalf("调用失败: %v", err)
}
log.Printf("[客户端] 收到响应: %s", resp.Msg)
time.Sleep(2 * time.Second)
}
作者bb
我将永远谴责文档写的不全的人. 同时谴责只会照搬example 发博客的啥X.
感谢
[]: https://www.doubao.com/chat/ "豆包"
别看了下面代码跟传递span无关
下面来看HandleRPC函数
// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
// 这里直接调用的下面handleRPC省略了
}
type int64Hist interface {
RecordSet(context.Context, int64, attribute.Set)
}
func (c *config) handleRPC(
ctx context.Context,
rs stats.RPCStats,
duration metric.Float64Histogram,
inSize, outSize int64Hist,
recordStatus func(*status.Status) (codes.Code, string),
) {
// 从之前TagRPC里拿出来metrics的东西
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
if gctx != nil && !gctx.record {
// 这里似乎一直不会执行, 因为TagRPC中record一直为true, 没有看到有其他更改的地方
return
}
// 从ctx里面拿出TagRPC里创建的span
span := trace.SpanFromContext(ctx)
// 默认为0
var messageId int64
// 这里用switch判断rs的六个状态执行不同代码, 在下面进行解释
switch rs := rs.(type)
在断点时一共有7次断点,下面列出每次断点rs的类型.
- stats.Begin 未进入switch???
- stats.DelayedPickComplete 未进入switch
stats.OutHeader 进入case
// 如果span活跃且可以记录事件时, 返回true if span.IsRecording() { // 从ctx获取对方节点信息, 即服务端 IP+PORT if p, ok := peer.FromContext(ctx); ok { // p.Addr.String() : 127.0.0.1:8085 // serverAddrAttrs(p.Addr.String()) : attribute.KeyValue类型 // k1: server.address v1: 127.0.0.1 // k2: server.port v2: 8085 // 最后给span设置attrs如果k存在则覆写 span.SetAttributes(serverAddrAttrs(p.Addr.String())...) } }stats.OutPayload 进入case
// metrics处理 if gctx != nil { messageId = atomic.AddInt64(&gctx.outMessages, 1) outSize.RecordSet(ctx, int64(rs.Length), gctx.metricAttrSet) } // c.SentEvent为false 没进入 if c.SentEvent && span.IsRecording() { span.AddEvent("message", trace.WithAttributes( semconv.RPCMessageTypeSent, semconv.RPCMessageIDKey.Int64(messageId), semconv.RPCMessageCompressedSizeKey.Int(rs.CompressedLength), semconv.RPCMessageUncompressedSizeKey.Int(rs.Length), ), ) }- InHeader 未进入
- Intrailer 未进入
InPayload 进入case
if gctx != nil { messageId = atomic.AddInt64(&gctx.inMessages, 1) inSize.RecordSet(ctx, int64(rs.Length), gctx.metricAttrSet) } if c.ReceivedEvent && span.IsRecording() { span.AddEvent("message", trace.WithAttributes( semconv.RPCMessageTypeReceived, semconv.RPCMessageIDKey.Int64(messageId), semconv.RPCMessageCompressedSizeKey.Int(rs.CompressedLength), semconv.RPCMessageUncompressedSizeKey.Int(rs.Length), ), ) }End 进入case
var rpcStatusAttr attribute.KeyValue var s *status.Status // 如果响应错误就用远程错误码赋值rpcStatusAttr, 否则用grpc_codes.OK.String()赋值 if rs.Error != nil { s, _ = status.FromError(rs.Error) rpcStatusAttr = semconv.RPCResponseStatusCode(s.Code().String()) } else { rpcStatusAttr = semconv.RPCResponseStatusCode(grpc_codes.OK.String()) } // 如果span还能记录, 把前面的rpcStatusAttr 设置给span 然后关闭span if span.IsRecording() { if s != nil { c, m := recordStatus(s) span.SetStatus(c, m) } span.SetAttributes(rpcStatusAttr) // 结束会自动赋值endTime span.End() } // metrics, 这里略过 var metricAttrs []attribute.KeyValue if gctx != nil { metricAttrs = make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1) metricAttrs = append(metricAttrs, gctx.metricAttrs...) } metricAttrs = append(metricAttrs, rpcStatusAttr) recordOpts := []metric.RecordOption{metric.WithAttributeSet(attribute.NewSet(metricAttrs...))} // 与上面span无关 elapsedTime := float64(rs.EndTime.Sub(rs.BeginTime)) / float64(time.Millisecond) duration.Record(ctx, elapsedTime, recordOpts...)