搞懂otelgrpc库是如何做到跨程序链路追踪的

基础

grpc是基于http/2.0协议的, otelgrpc库中大量出现metadata可以简单理解成http请求中的header


核心文件

go.opentelemetry.io\contrib\instrumentation\google.golang.org\grpc\otelgrpc@v0.65.0\stats_handler.go

整个trace功能的核心都在这个文件里面.

删除了import和注释, 删除了无用函数. 用$1 $2代表参数1 参数2

// TagRPC 给每一次rpc调用附加信息
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
    // 提取info.FullMethodName (/GreetService/SayHello)
    // name是判断FullMethodName如果第一个字符是/就去掉第一个字符返回, 不是直接返回FullMethodName. 第二个返回是[]attribute.KeyValue{semconv.RPCMethod(name)}
    // 路径: go.opentelemetry.io\contrib\instrumentation\google.golang.org\grpc\otelgrpc@v0.65.0\internal\parse.go:20
    // name: GreetService/SayHello
    // attrs: 有一个key为rpc.method的值
    name, attrs := internal.ParseFullMethod(info.FullMethodName)
    
    // attrs类型attribute.KeyValue类型, 一个key可以有多个value. 这里用semconv.RPCSystemNameGRPC再添加rpc.system.name的key
    attrs = append(attrs, semconv.RPCSystemNameGRPC)

    if record {
        // 创建一个新的切片, 以避免错误识别到metrics所使用的相同attrs切片中 
        spanAttributes := make([]attribute.KeyValue, 0, len(attrs)+len(h.SpanAttributes))
        spanAttributes = append(append(spanAttributes, attrs...), h.SpanAttributes...)
        ctx, _ = h.tracer.Start(
            ctx,
            name,
            trace.WithSpanKind(trace.SpanKindClient),
            trace.WithAttributes(spanAttributes...),
        )
    }

    // 1. inject内部用metadata获取$1 ctx的内容.
    // 2. 把上面h.tracer.Start创建的spanid、traceid注入到propagators.
    // 3. 以key为traceparent, value为00-{traceID}-{spanID}-01 格式来填入.
    // 4. 用metadata再把md包装一下返回ctx
    return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.Propagators)
}

先来总结一下TagRPC做了哪些事情吧

  1. 用pb里面定义的service名拼接调用的rpc函数名. 如果第一个是/就去掉赋值给name, 然后给attrs赋值一个key是rpc.method value是未去除/的name
  2. 给attrs添加key是rpc.system.name value是grpc
  3. 判断filter是否为空, 不为空用filter过滤一下info
  4. 创建一个新的spanAttrs避免和metrics冲突
  5. 创建一个子span
  6. metrics代码这里跳过...
  7. 把子span返回的ctx包裹上gctx, 然后提取内容注入给全局propagator(如果没有应当会新建一个)
  8. 结束, 执行HandleRPC

核心就是inject添加了tranceparent的metadata. 然后在服务端接收, 然后otel.GetTextMapPropagator().Extract() 提取传递来的span, 他会自动new成一个trace.SpanContext返回, 然后需要手动trace.SpanContextFromContext(ctx) 提取成span. 进而就可以使用tracer.Start() 开启一个子span.

客户端和服务端均需连接到otel. jaeger依靠traceID、parentID、spanID来创建一个trace. 你可以尝试给客户端的end前sleep一下, 会发现服务端的span已经到jaeger而没有客户端的span. sleep结束后jaeger自动合并两个span到一个trace.

tranceparent传递格式: 00-{traceID}-{spanID}-01

最后的01翻看Extract好像是采样有关的. 断点+翻代码+搜索搞了一天, 懒得再研究了, 相关内容太少了, 而且文档也不详细.


简单实现

服务端

package main

import (
    pb "alitracing/client/proto"
    "context"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
    "go.opentelemetry.io/otel/trace"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    "log"
    "net"
    "time"
)

func initTracer(serviceName string) (*sdktrace.TracerProvider, error) {
    exporter, err := jaeger.New(
        jaeger.WithCollectorEndpoint(
            jaeger.WithEndpoint("http://127.0.0.1:14268/api/traces"),
        ),
    )
    if err != nil {
        return nil, err
    }
    log.Printf("[%s] Jaeger 导出器初始化成功", serviceName)

    // 创建资源属性
    res, err := resource.New(
        context.Background(),
        resource.WithAttributes(
            semconv.ServiceNameKey.String(serviceName),
            semconv.ServiceVersionKey.String("1.0.0"),
            attribute.String("environment", "dev"),
        ),
    )
    if err != nil {
        return nil, err
    }
    
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithSyncer(exporter),
        sdktrace.WithResource(res),
    )

    // 设置全局 TracerProvider 和 Propagator
    otel.SetTracerProvider(tp)
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
        propagation.TraceContext{},
        propagation.Baggage{},
    ))
    log.Printf("[%s] OTel 初始化完成", serviceName)

    return tp, nil
}

type metadataCarrier struct {
    md metadata.MD
}

func (c *metadataCarrier) Get(key string) string {
    values := c.md.Get(key)
    if len(values) == 0 {
        return ""
    }
    return values[0]
}

func (c *metadataCarrier) Set(key, value string) {
    c.md.Set(key, value)
}

func (c *metadataCarrier) Keys() []string {
    keys := make([]string, 0, len(c.md))
    for k := range c.md {
        keys = append(keys, k)
    }
    return keys
}

type server struct {
    pb.UnimplementedGreetServiceServer
    tracer trace.Tracer
}

func (s *server) SayHello(ctx context.Context, req *pb.SayHelloReq) (*pb.SayHelloResp, error) {
    log.Println("[服务端] 收到请求,开始提取 Trace 上下文")

    // 提取入站 Metadata
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        md = metadata.New(nil)
        log.Println("[服务端] 未提取到入站 Metadata,使用空 Metadata")
    } else {
        log.Printf("[服务端] 提取到入站 Metadata: %v", md)
    }

    // 从 Metadata 提取远程 SpanContext
    carrier := &metadataCarrier{md: md}
    ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)

    // 验证提取到的 SpanContext
    extractedSC := trace.SpanContextFromContext(ctx)
    if extractedSC.IsValid() && extractedSC.IsRemote() {
        log.Printf("[服务端] 成功提取远程 SpanContext: TraceID=%s, SpanID=%s", extractedSC.TraceID(), extractedSC.SpanID())
    } else {
        log.Println("[服务端] 未提取到有效的远程 SpanContext,将创建新的根 Span")
    }

    // 创建服务端 Span

    ctx, span := s.tracer.Start(
        ctx,
        "Greeter/SayHello",
        trace.WithAttributes(
            semconv.RPCMethodKey.String("SayHello"),
            semconv.RPCServiceKey.String("Greeter"),
            attribute.String("request.name", req.Name),
        ),
    )
    defer func() {
        span.End()
        log.Printf("[服务端] Span 结束: TraceID=%s, SpanID=%s", span.SpanContext().TraceID(), span.SpanContext().SpanID())
    }()
    time.Sleep(time.Second)
    // 业务逻辑
    return &pb.SayHelloResp{Msg: "Hello " + req.Name}, nil
}

func main() {
    tp, err := initTracer("grpc-server")
    if err != nil {
        log.Fatalf("初始化 OTel 失败: %v", err)
    }
    defer func() { _ = tp.Shutdown(context.Background()) }()

    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("监听失败: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterGreetServiceServer(s, &server{tracer: otel.Tracer("grpc-server-tracer")})
    log.Println("服务端启动在 :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("服务启动失败: %v", err)
    }
    time.Sleep(time.Hour)
}

客户端

package main

import (
    pb "alitracing/client/proto"
    "context"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
    "go.opentelemetry.io/otel/trace"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"
    "log"
    "time"
)

func initTracer(serviceName string) (*sdktrace.TracerProvider, error) {
    exporter, err := jaeger.New(
        jaeger.WithCollectorEndpoint(
            jaeger.WithEndpoint("http://127.0.0.1:14268/api/traces"),
        ),
    )
    if err != nil {
        return nil, err
    }
    log.Printf("[%s] Jaeger 导出器初始化成功", serviceName)

    // 创建资源属性
    res, err := resource.New(
        context.Background(),
        resource.WithAttributes(
            semconv.ServiceNameKey.String(serviceName),
            semconv.ServiceVersionKey.String("1.0.0"),
            attribute.String("environment", "dev"),
        ),
    )
    if err != nil {
        return nil, err
    }

    tp := sdktrace.NewTracerProvider(
        sdktrace.WithSyncer(exporter),
        sdktrace.WithResource(res),
    )

    // 设置全局 TracerProvider 和 Propagator
    otel.SetTracerProvider(tp)
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
        propagation.TraceContext{},
        propagation.Baggage{},
    ))
    log.Printf("[%s] OTel 初始化完成", serviceName)

    return tp, nil
}

type metadataCarrier struct {
    md metadata.MD
}

func (c *metadataCarrier) Get(key string) string {
    values := c.md.Get(key)
    if len(values) == 0 {
        return ""
    }
    return values[0]
}

func (c *metadataCarrier) Set(key, value string) {
    c.md.Set(key, value)
}

func (c *metadataCarrier) Keys() []string {
    keys := make([]string, 0, len(c.md))
    for k := range c.md {
        keys = append(keys, k)
    }
    return keys
}

func main() {
    time.Sleep(1 * time.Second)

    tp, err := initTracer("grpc-client")
    if err != nil {
        log.Fatalf("初始化 OTel 失败: %v", err)
    }
    defer func() {
        // 确保 TracerProvider 关闭,Flush 剩余 Span
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        if err := tp.Shutdown(ctx); err != nil {
            log.Printf("关闭 TracerProvider 失败: %v", err)
        }
        log.Println("[客户端] TracerProvider 已关闭")
    }()
    tracer := otel.Tracer("grpc-client-tracer")

    conn, err := grpc.Dial(
        "localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        log.Fatalf("连接失败: %v", err)
    }
    defer conn.Close()
    client := pb.NewGreetServiceClient(conn)

    // 创建客户端根 Span
    ctx, span := tracer.Start(
        context.Background(),
        "ClientCall",
        trace.WithAttributes(
            semconv.RPCMethodKey.String("SayHello"),
            semconv.RPCServiceKey.String("Greeter"),
        ),
    )
    defer func() {
        span.End()
        log.Printf("[客户端] Span 结束: TraceID=%s, SpanID=%s", span.SpanContext().TraceID(), span.SpanContext().SpanID())
    }()
    log.Printf("[客户端] 创建客户端 Span: TraceID=%s, SpanID=%s", span.SpanContext().TraceID(), span.SpanContext().SpanID())

    // 注入 SpanContext 到 Metadata
    md := metadata.New(nil)
    carrier := &metadataCarrier{md: md}
    otel.GetTextMapPropagator().Inject(ctx, carrier)
    log.Printf("[客户端] 注入 Trace 上下文到 Metadata: %v", md)

    // 发起 RPC
    ctx = metadata.NewOutgoingContext(ctx, md)
    resp, err := client.SayHello(ctx, &pb.SayHelloReq{Name: "Alice"})
    if err != nil {
        log.Fatalf("调用失败: %v", err)
    }
    log.Printf("[客户端] 收到响应: %s", resp.Msg)

    time.Sleep(2 * time.Second)
}

作者bb

我将永远谴责文档写的不全的人. 同时谴责只会照搬example 发博客的啥X.


感谢

[]: https://www.doubao.com/chat/ "豆包"


别看了下面代码跟传递span无关

下面来看HandleRPC函数

// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
    // 这里直接调用的下面handleRPC省略了
}

type int64Hist interface {
    RecordSet(context.Context, int64, attribute.Set)
}

func (c *config) handleRPC(
    ctx context.Context,
    rs stats.RPCStats,
    duration metric.Float64Histogram,
    inSize, outSize int64Hist,
    recordStatus func(*status.Status) (codes.Code, string),
) {
    // 从之前TagRPC里拿出来metrics的东西
    gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
   
    if gctx != nil && !gctx.record {
        // 这里似乎一直不会执行, 因为TagRPC中record一直为true, 没有看到有其他更改的地方
        return
    }
    
    // 从ctx里面拿出TagRPC里创建的span
    span := trace.SpanFromContext(ctx)
    
    // 默认为0
    var messageId int64

    // 这里用switch判断rs的六个状态执行不同代码, 在下面进行解释
    switch rs := rs.(type)
        

在断点时一共有7次断点,下面列出每次断点rs的类型.

  1. stats.Begin 未进入switch???
  2. stats.DelayedPickComplete 未进入switch
  3. stats.OutHeader 进入case

    // 如果span活跃且可以记录事件时, 返回true
    if span.IsRecording() {
        // 从ctx获取对方节点信息, 即服务端 IP+PORT
        if p, ok := peer.FromContext(ctx); ok {
            // p.Addr.String() : 127.0.0.1:8085
            // serverAddrAttrs(p.Addr.String()) : attribute.KeyValue类型
            //         k1: server.address v1: 127.0.0.1
            //         k2: server.port       v2: 8085
            // 最后给span设置attrs如果k存在则覆写
            span.SetAttributes(serverAddrAttrs(p.Addr.String())...)
        }
    }
  4. stats.OutPayload 进入case

    // metrics处理
    if gctx != nil {
        messageId = atomic.AddInt64(&gctx.outMessages, 1)
        outSize.RecordSet(ctx, int64(rs.Length), gctx.metricAttrSet)
    }
    
    // c.SentEvent为false 没进入
    if c.SentEvent && span.IsRecording() {
        span.AddEvent("message",
            trace.WithAttributes(
                semconv.RPCMessageTypeSent,
                semconv.RPCMessageIDKey.Int64(messageId),
                semconv.RPCMessageCompressedSizeKey.Int(rs.CompressedLength),
                semconv.RPCMessageUncompressedSizeKey.Int(rs.Length),
            ),
        )
    }
  5. InHeader 未进入
  6. Intrailer 未进入
  7. InPayload 进入case

    if gctx != nil {
        messageId = atomic.AddInt64(&gctx.inMessages, 1)
        inSize.RecordSet(ctx, int64(rs.Length), gctx.metricAttrSet)
    }
    
    if c.ReceivedEvent && span.IsRecording() {
        span.AddEvent("message",
            trace.WithAttributes(
                semconv.RPCMessageTypeReceived,
                semconv.RPCMessageIDKey.Int64(messageId),
                semconv.RPCMessageCompressedSizeKey.Int(rs.CompressedLength),
                semconv.RPCMessageUncompressedSizeKey.Int(rs.Length),
            ),
        )
    }
  8. End 进入case

    var rpcStatusAttr attribute.KeyValue
    
    var s *status.Status
    
    // 如果响应错误就用远程错误码赋值rpcStatusAttr, 否则用grpc_codes.OK.String()赋值
    if rs.Error != nil {
        s, _ = status.FromError(rs.Error)
        rpcStatusAttr = semconv.RPCResponseStatusCode(s.Code().String())
    } else {
        rpcStatusAttr = semconv.RPCResponseStatusCode(grpc_codes.OK.String())
    }
    
    // 如果span还能记录, 把前面的rpcStatusAttr 设置给span 然后关闭span
    if span.IsRecording() {
        if s != nil {
            c, m := recordStatus(s)
            span.SetStatus(c, m)
        }
        span.SetAttributes(rpcStatusAttr)
        // 结束会自动赋值endTime
        span.End()
    }
    
    // metrics, 这里略过
    var metricAttrs []attribute.KeyValue
    if gctx != nil {
        metricAttrs = make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
        metricAttrs = append(metricAttrs, gctx.metricAttrs...)
    }
    metricAttrs = append(metricAttrs, rpcStatusAttr)
    recordOpts := []metric.RecordOption{metric.WithAttributeSet(attribute.NewSet(metricAttrs...))}
    
    // 与上面span无关
    elapsedTime := float64(rs.EndTime.Sub(rs.BeginTime)) / float64(time.Millisecond)
    
    duration.Record(ctx, elapsedTime, recordOpts...)

我来吐槽

*