# OpenTelemetry

OpenTelemetry (OTel) 是一个开源可观测性框架,可允许开发团队以统一的单一格式生成、处理和传输遥测数据。它是由云原生计算基金会 (CNCF (opens new window)) 开发的,旨在提供标准协议和工具,以便收集指标、日志和跟踪并将其发送到监测平台。

OpenTelemetry 提供独立于供应商的 SDK、API 和工具,以便您可以将数据发送到任何可观测性后端进行分析。

OpenTelemetry (opens new window) 正在快速成为云原生应用程序领域内主导的可观测性遥测数据标准。如果组织想要做好准备以满足未来的数据需求,而且不想被锁定到某一特定供应商,也不想受限于其既有技术,那么采用 OpenTelemetry 对其至为关键。

# 架构

alt text

由于OpenTelemetry旨在成为一个为厂商和可观察性后端提供的跨语言框架,因此它非常灵活且可扩展,但同时也很复杂。OpenTelemetry的默认实现中,其架构可以分为如下三部分:

  1. OpenTelemetry API
  2. OpenTelemetry SDK,包括
    • Tracer pipeline
    • Meter pipeline
    • Shared Context layer
  3. Collector

对于数据追踪的需求,使用 Tracer Pipeline 的功能即可。

# 概念

  • trace:一个 trace 对应一整条调用链路,包括从客户端到服务端、服务端内部调用 db 和 redis,服务端调用其它微服务、服务端返回结果,等等。
  • span:一个 span 代表一段连续的过程,比如客户端开始调用服务端到接收服务端结束可以是一个 span。
    • span 的生命周期常常是函数开始时创建,函数结束时停止。
    • 各 span 可以是重叠的(如客户端调用服务端的 span 和服务端处理的 span 是重合的,如下图),也可以是不重叠的(服务 A 处理完数据,将数据存到 db;一分钟后服务 B 从 db 取出数据进行下一步,这两个 span 没有重叠)
    • 一个 trace 链路上可以有一个或多个 span。

alt text

# Tracer 示例

# 创建 span

Go 里只需要将 ctx 包裹一层 newSpan,就可以记录函数的调用时间等信息。

// GetUser retrieves and returns hard coded user data for demonstration.
func GetUser(ctx context.Context, id int) g.Map {
        ctx, span := gtrace.NewSpan(ctx, "GetUser")
        defer span.End()
        m := g.Map{}
        gutil.MapMerge(
                m,
                GetInfo(ctx, id),
                GetDetail(ctx, id),
                GetScores(ctx, id),
        )
        return m
}

// GetInfo retrieves and returns hard coded user info for demonstration.
func GetInfo(ctx context.Context, id int) g.Map {
        ctx, span := gtrace.NewSpan(ctx, "GetInfo")
        defer span.End()
        if id == 100 {
                return g.Map{
                        "id":     100,
                        "name":   "john",
                        "gender": 1,
                }
        }
        return nil
}

Python 里也类似,也是把 trace 调用链路放到 context 里。不过 Python 不需要显式传递 context,因为 opentelemetry 基于 Python 的 contextvars 模块来实现隐式传递 context。

from opentelemetry import trace
from opentelemetry.trace import Tracer

tracer = trace.get_tracer(__name__)
def get_user(id: int) -> dict:
    with tracer.start_as_current_span("get_user") as span:
        user = {}
        user.update(get_info(id))
        user.update(get_detail(id))
        user.update(get_scores(id))
        return user
def get_info(id: int) -> dict:
    with tracer.start_as_current_span("get_info") as span:
        if id == 100:
            return {
                "id": 100,
                "name": "john",
                "gender": 1,
            }
        return None

# 在 ORM、缓存层启用追踪

只要 ORM、缓存层有对应的 OpenTelemetry 包,安装库并在代码中完成初始化以后,ORM 和缓存层在识别到存在 trace 上下文后,就会记录信息并发送,无需修改业务逻辑。 alt text

常用的中间件都有 OpenTelemetry 的支持:

Python 也有类似的中间件,在 OpenTelemetry-Python-Contrib — OpenTelemetry Python Contrib documentation (opens new window) 有官方对大多数 Python 框架和库的支持,如 Django、Flask、FastAPI、requests、aiohttp、Redis、kafka、SQLAlchemy、Psycopg、PyMySQL、pymongo 等。

# TraceID 注入和获取

在链路跟踪中,TraceID 作为在各个服务间传递的唯一标识,用于串联服务间请求关联关系,是非常重要的一项数据。

# 单一应用里的 TraceID 流转:context

在单一的应用里,TraceID 常常放在 context 里,在函数中进行流转。在输出日志时也需要将 TraceID 一并输出,方便追查数据流转。

# 应用之间的 TraceID 流转:traceparent header & Propagator

对于应用之间的 TraceID 流转,W3C 定义了 traceparent header (opens new window) 字段。这个字段由四个部分拼接而成:

  • version
  • trace-id
  • parent-id
  • trace-flags alt text 通常约定,Traceparent 在 HTTP 协议中放在 HTTP Header 中传输,在 GRPC 协议中放在 Metadata 里传输。 基于这个约定:
  • 客户端:在客户端发起请求时,需要向请求头注入traceparent。如果客户端请求时的 context 已经有 TraceID 和 SpanID,就从 context 读出来拼成 traceparent 注入到 header 或 metadata。如果没有就不注入。
  • 在服务端接收请求时,需要从请求的 header 或 metadata 获取 traceparent、解析 traceID,然后注入到 context 里。如果请求头中没有 TraceID,则会新生成一个 TraceID 并注入到 context 里。

上述服务端或客户端如果没有自带 TraceID 的生成、注入和获取的中间件,则可以自己花十几行代码调用 OpenTelemetry 自带的 Propagator 实现一个中间件。下面以 Go 为示例:

import (
  "go.opentelemetry.io/otel"
  "go.opentelemetry.io/otel/propagation"
)

// 服务端/客户端启动时设置全局 Propagator
propagator := propagation.TraceContext{}
otel.SetTextMapPropagator(propagator)

// 客户端每次请求时调用 propagator 将 traceparent 注入 HTTP header
propagator := otel.GetTextMapPropagator()
propagator.Inject(req.Context(), propagation.HeaderCarrier(req.Header))

// 服务端每次请求时调用 propagator 从 HTTP Header 提取 traceparent
propagator := otel.GetTextMapPropagator()
ctx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header))
TraceContextPropagator.Inject() 的原理基本如下:
// 从请求上下文中获取当前的 Span
span := trace.SpanFromContext(req.Context())
spanContext := span.SpanContext()

if spanContext.IsValid() {
    // 构建 traceparent header
    traceparent := fmt.Sprintf("00-%s-%s-%02s",
        spanContext.TraceID(),
        spanContext.SpanID(),
        spanContext.TraceFlags()&trace.FlagsSampled,
    )
    
    // 在请求头中添加 traceparent
    req.Header.Set("traceparent", traceparent)
}

可见,如果按照上述的约定传递 traceID,就可以直接使用自带的 TraceContextPropagator 来完成 traceID 的注入和提取。

如果 traceID 的传递不是按照上面的约定,依然可以自己实现一个 Propagator 来完成 traceID 的注入和提取。

reference https://www.elastic.co/cn/what-is/opentelemetry https://goframe.org/pages/viewpage.action?pageId=148537351 https://go-kratos.dev/en/docs/component/middleware/tracing/

代码仓库:https://gitlab.carizon.work/IS1IICO/go-kratos-example

# 实测结论

  • Kratos 服务端、客户端:GRPC/HTTP 均包含 tracing 中间件可以直接使用。(10/10)
  • net/http 客户端、服务端:可以基于现成的 Propagator 实现一个中间件。(9/10)
  • Google GRPC 客户端、服务端:需要实现一个对 metadata 注入和提取字段的 TextMapCarrier,然后基于现成的 Propagator 实现中间件。(8/10)
  • Python Flask HTTP 客户端:包含 tracing 中间件可以直接使用。(10/10)

# 部署 jaeger

Jaeger 是 tracing 的后端,用于接收整个链路的 tracing 请求。

部署测试 jaeger-all-in-one:https://www.jaegertracing.io/docs/1.62/getting-started/

docker run -d --name jaeger \
  -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
  -p 6831:6831/udp \
  -p 6832:6832/udp \
  -p 5778:5778 \
  -p 16686:16686 \
  -p 4317:4317 \
  -p 4318:4318 \
  -p 14250:14250 \
  -p 14268:14268 \
  -p 14269:14269 \
  -p 9411:9411 \
  jaegertracing/all-in-one:1.62.0

# 实测过程

# Kratos 服务端创建 trace

按照 https://go-kratos.dev/en/docs/component/middleware/tracing/ 在 Kratos 初始化 HTTPServer 和 GRPCServer 时初始化 Tracer。

package utils

import (
  "context"
  "time"

  "go.opentelemetry.io/otel"
  "go.opentelemetry.io/otel/attribute"
  "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
  "go.opentelemetry.io/otel/sdk/resource"
  tracesdk "go.opentelemetry.io/otel/sdk/trace"
  semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
)

// https://go-kratos.dev/en/docs/component/middleware/tracing/
// Set global trace provider
func InitTracer(endpoint string, serviceName string, nodeType string) (*tracesdk.TracerProvider, error) {
  // 创建 OTLP exporter
  conn, err := grpc.NewClient(endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    return nil, err
  }
  exp, err := otlptracegrpc.New(
    context.Background(),
    otlptracegrpc.WithGRPCConn(conn),
    otlptracegrpc.WithTimeout(time.Second),
  )
  if err != nil {
    return nil, err
  }

  tp := tracesdk.NewTracerProvider(
    // 将基于父span的采样率设置为100%
    tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.TraceIDRatioBased(1.0))),
    tracesdk.WithSyncer(exp),
    // 在资源中记录有关此应用程序的信息
    tracesdk.WithResource(resource.NewSchemaless(
      semconv.ServiceNameKey.String(serviceName),
      attribute.String("exporter", "jaeger"),
      attribute.String("node", nodeType),
    )),
  )
  otel.SetTracerProvider(tp)
  return tp, nil
}
// NewGRPCServer new a gRPC server.
func NewGRPCServer(c *conf.Server, greeter *service.GreeterService, logger log.Logger) *grpc.Server {
  _, err := utils.InitTracer("localhost:4317", "kratos-server", "client")
  if err != nil {
    panic(err)
  }
  var opts = []grpc.ServerOption{
    grpc.Middleware(
      recovery.Recovery(),
      tracing.Server(),
    ),
  }
  if c.Grpc.Network != "" {
    opts = append(opts, grpc.Network(c.Grpc.Network))
  }
  if c.Grpc.Addr != "" {
    opts = append(opts, grpc.Address(c.Grpc.Addr))
  }
  if c.Grpc.Timeout != nil {
    opts = append(opts, grpc.Timeout(c.Grpc.Timeout.AsDuration()))
  }
  srv := grpc.NewServer(opts...)
  v1.RegisterGreeterServer(srv, greeter)
  return srv
}

重新运行后多了 traceId 和 spanId。

alt text

访问 jaeger http://localhost:16686/ 即可看到该 trace 和 span。

alt text

# Kratos GRPC 客户端创建 trace

如果是使用 kratos 的客户端 github.com/go-kratos/kratos/v2/transport/grpc,请求的时候就会带上 traceparent,服务端能够直接解析出同一个 traceId。

func grpcCli() (*googlegrpc.ClientConn, error) {
  if err != nil {
    log.Fatalf("Failed to init tracer: %v", err)
  }
  return grpc.DialInsecure(
    context.Background(),
    grpc.WithEndpoint("localhost:9000"),
    grpc.WithMiddleware(
      tracing.Client(),
    ),
  )
}

func main() {
  _, err := utils.InitTracer("localhost:4317", "kratos_grpc_client", "client")
  if err != nil {
    panic(err)
  }
  conn, err := grpcCli()
  if err != nil {
    log.Fatalf("Failed to connect: %v", err)
  }
  defer conn.Close()

  // 创建 gRPC 客户端
  client := pb.NewGreeterClient(conn)

  ctx := context.Background()
  // 调用 gRPC 方法
  response, err := client.SayHello(ctx, &pb.HelloRequest{Name: "Kratos"})
  if err != nil {
    log.Fatalf("Could not greet: %v", err)
  }

  log.Printf("Greeting: %s", response.Message)
}

alt text 访问 jaeger http://localhost:16686/ 即可看到该 trace 和 span。 alt text 不过需要注意的是,如果客户端调用时 ctx 里没有 traceId,kratos 中间件会生成 traceId,但请求完以后是拿不到 traceId 的。如果有这个需求,可以自己负责创建 span 的逻辑,然后用在新的 ctx 里发请求()。

# 原生 GRPC 客户端创建 trace

Google GRPC 没有这个中间件,需要自己造轮子,实现一个中间件拼 traceparent。

package main

import (
  "context"
  "fmt"
  "log"

  pb "go-kratos-ent-example/api/helloworld/v1"

  "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
  "go.opentelemetry.io/otel"
  "go.opentelemetry.io/otel/attribute"
  "go.opentelemetry.io/otel/exporters/jaeger"
  "go.opentelemetry.io/otel/sdk/resource"
  tracesdk "go.opentelemetry.io/otel/sdk/trace"
  semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
  "go.opentelemetry.io/otel/trace"
  "google.golang.org/grpc"
  "google.golang.org/grpc/metadata"
)

func traceUnaryClientInterceptor() grpc.UnaryClientInterceptor {
  return func(
    ctx context.Context,
    method string,
    req, reply interface{},
    cc *grpc.ClientConn,
    invoker grpc.UnaryInvoker,
    opts ...grpc.CallOption,
  ) error {
    tracer := otel.Tracer("your-tracer-name")
    ctx, span := tracer.Start(ctx, method)
    defer span.End()

    spanContext := span.SpanContext()

    // TODO:可以实现一个 GrpcMetadataCarrier,然后使用 propagator 完成注入
    // 参考源代码:https://github.com/open-telemetry/opentelemetry-go/blob/v1.32.0/propagation/trace_context.go#L52
    // Inject the trace context into the metadata
    md := metadata.Pairs()
    if trace.SpanContextFromContext(ctx).IsValid() {
      traceparent := fmt.Sprintf("00-%s-%s-%2s",
        spanContext.TraceID(),
        spanContext.SpanID(),
        spanContext.TraceFlags() & trace.FlagsSampled,
      )
      log.Print(traceparent)
      md.Set("traceparent", traceparent)
    }

    // Add metadata to outgoing context
    ctx = metadata.NewOutgoingContext(ctx, md)

    // Invoke the original request
    return invoker(ctx, method, req, reply, cc, opts...)
  }
}

func grpcCli() (*grpc.ClientConn, *tracesdk.TracerProvider, error) {
  tp, _ := utils.InitTracer("localhost:4317", "google_grpc_client", "client")
  if err != nil {
    log.Fatalf("Failed to init tracer: %v", err)
  }
  conn, err := grpc.Dial(
    "localhost:9000",
    grpc.WithInsecure(), // 不安全连接
    grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
    grpc.WithUnaryInterceptor(traceUnaryClientInterceptor()),
  )
  if err != nil {
    return nil, tp, err
  }
  return conn, tp, nil
}

func main() {
  conn, tp, err := grpcCli()
  if err != nil {
    log.Fatalf("Failed to connect: %v", err)
  }
  defer conn.Close()

  // 创建 gRPC 客户端
  client := pb.NewGreeterClient(conn)

  ctx, span := otel.Tracer("example-client").Start(context.Background(), "SayHello")
  // 调用 gRPC 方法
  response, err := client.SayHello(ctx, &pb.HelloRequest{Name: "Kratos"})
  if err != nil {
    log.Fatalf("Could not greet: %v", err)
  }

  log.Printf("Greeting: %s", response.Message)

  // 从 span 中获取 Trace ID
  traceID := span.SpanContext().TraceID().String()

  log.Printf("Greeting: %s", response.Message)
  log.Printf("TraceID: %s", traceID)

  tp.ForceFlush(ctx)
}

实现了以后也可以正常创建 trace 并跨应用传递。

alt text

# Kratos HTTP 客户端创建 trace

tracing.Client 同时支持 HTTP 和 GRPC 协议。不过需要注意的是,Client.Do() 方法请求时不会调用中间件,只有 Client.Invoke() 才会。

package main

import (
  "context"
  "fmt"
  "go-kratos-ent-example/internal/utils"
  "log"

  "github.com/go-kratos/kratos/v2/transport/http"

  "github.com/go-kratos/kratos/v2/middleware/tracing"
  "go.opentelemetry.io/otel"

  "go.opentelemetry.io/otel/propagation"
)

func httpClient() (*http.Client, error) {
  conn, err := http.NewClient(
    context.Background(),
    http.WithEndpoint("127.0.0.1:8000"),
    http.WithMiddleware(
      tracing.Client(),
    ),
  )
  return conn, err
}

func main() {
  _, err := utils.InitTracer("localhost:4317", "net_http_client", "client")

  // 设置全局 Propagator
  propagator := propagation.TraceContext{}
  otel.SetTextMapPropagator(propagator)

  if err != nil {
    log.Fatalf("Failed to initialize tracer: %v", err)
  }
  client, err := httpClient()
  if err != nil {
    log.Fatalf("Failed to initialize client: %v", err)
  }

  // 创建一个新的请求上下文
  tracer := otel.Tracer("example-tracer")
  ctx, span := tracer.Start(context.Background(), "client-request")
  defer span.End()

  args := map[string]interface{}{}
  response := map[string]interface{}{}
  // 发起请求
  err = client.Invoke(ctx, "GET", "/helloworld/http", args, &response)
  if err != nil {
    fmt.Println("Error making request:", err)
    return
  }

  fmt.Println("Response message:", response["message"])
}

# 原生 HTTP 客户端创建 trace

原生 HTTP 客户端没有能直接用的中间件,需要实现一个中间件。由于库里自带的 TextMapPropagator 和 HeaderCarrier 直接可用,实现起来很快。

package main

import (
  "context"
  "fmt"
  "net/http"

  "go.opentelemetry.io/otel"
  "go.opentelemetry.io/otel/attribute"
  "go.opentelemetry.io/otel/exporters/jaeger"
  "go.opentelemetry.io/otel/sdk/resource"
  tracesdk "go.opentelemetry.io/otel/sdk/trace"
  semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
  "go.opentelemetry.io/otel/trace"
)

type roundTripperFunc func(*http.Request) (*http.Response, error)

func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
  return f(req)
}

func traceMiddleware(next http.RoundTripper) http.RoundTripper {
  return roundTripperFunc(func(req *http.Request) (*http.Response, error) {
    // 创建一个新的请求上下文
    tracer := otel.Tracer("example-tracer")
    ctx, span := tracer.Start(req.Context(), "client-request")
    defer span.End()

    propagator := otel.GetTextMapPropagator()
    propagator.Inject(ctx, propagation.HeaderCarrier(req.Header))
    fmt.Printf("%v", req.Header)
    // 调用下一个 RoundTripper
    return next.RoundTrip(req)
  })
}

func httpClient() *http.Client {
  // 创建一个 HTTP 客户端
  client := &http.Client{
    Transport: traceMiddleware(http.DefaultTransport),
  }
  return client
}

func main() {
  _, err := utils.InitTracer("localhost:4317", "net_http_client", "client")
  if err != nil {
    log.Fatalf("Failed to initialize tracer: %v", err)
  }
  
  client := httpClient()

  req, err := http.NewRequestWithContext(context.Background(), "GET", "http://localhost:8000/helloworld/http", nil)

  // 发起请求
  resp, err := client.Do(req)
  if err != nil {
    fmt.Println("Error making request:", err)
    return
  }
  defer resp.Body.Close()

  fmt.Println("Response status:", resp.Status)
}

# 原生 HTTP 服务端:需要手动创建 tracer 和 span

配置好 TextMapPropagator 就可以读取 traceparent,解析出 traceId。然后再使用 tracer.Start 创建一个新的 span。

package main

import (
  "fmt"
  "log"
  "net/http"

  "go.opentelemetry.io/otel"
  "go.opentelemetry.io/otel/attribute"
  "go.opentelemetry.io/otel/exporters/jaeger"
  "go.opentelemetry.io/otel/propagation"
  "go.opentelemetry.io/otel/sdk/resource"
  tracesdk "go.opentelemetry.io/otel/sdk/trace"
  semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)


func main() {
  _, err := utils.InitTracer("localhost:4317", "net_http_server", "server")
  if err != nil {
    log.Fatalf("Failed to initialize tracer: %v", err)
  }
  // 初始化 Tracer
  tracer := otel.Tracer("example-server")

  // 设置全局 Propagator
  propagator := propagation.TraceContext{}
  otel.SetTextMapPropagator(propagator)

  http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    // 从请求头中提取上下文
    ctx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header))

    // 创建一个新的 Span
    ctx, span := tracer.Start(ctx, "server-request")
    defer span.End()

    // 处理请求
    fmt.Fprintf(w, "Hello, World!")
  })

  log.Println("Server is running on :8080")
  log.Fatal(http.ListenAndServe(":8080", nil))
}

# Python HTTP 服务端读取 trace、创建 tracer 和 span

from flask import Flask, request
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.wsgi import OpenTelemetryMiddleware
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, BatchSpanProcessor
from opentelemetry.propagate import extract

app = Flask(__name__)

# 设置 TracerProvider
trace.set_tracer_provider(TracerProvider(
    resource=Resource.create({"service.name": "python-http-server"})
))

# 配置 Jaeger 导出器
jaeger_exporter = JaegerExporter(
    agent_host_name="python-http-server",
    agent_port=6831,
    collector_endpoint="http://localhost:14268/api/traces",
)

# 添加 Span Processor 和 Exporter
span_processor = SimpleSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# 自动为 Flask 应用添加中间件
FlaskInstrumentor().instrument_app(app)

# 手动添加 WSGI 中间件
app.wsgi_app = OpenTelemetryMiddleware(app.wsgi_app)

@app.route("/helloworld/:name")
def hello():
    # 从请求头中提取上下文
    ctx = extract(request.headers)
    tracer = trace.get_tracer(__name__)

    # 创建一个新的 Span
    with tracer.start_as_current_span("server-request", context=ctx):
        return "Hello, World!"

if __name__ == "__main__":
    app.run(debug=True, port=8000)

# Propagator 的接口和造轮子

接口:

// TextMapPropagator propagates cross-cutting concerns as key-value text
// pairs within a carrier that travels in-band across process boundaries.
type TextMapPropagator interface {
  // Inject set cross-cutting concerns from the Context into the carrier.
  Inject(ctx context.Context, carrier TextMapCarrier)

  // Extract reads cross-cutting concerns from the carrier into a Context.
  Extract(ctx context.Context, carrier TextMapCarrier) context.Context
  
  // Fields returns the keys whose values are set with Inject.
  Fields() []string
}

封装:

// pkg/common/trace/propagators.go
// 这个文件实现了多个函数,使得从不同的结构体里存、取 trace 信息。

package trace

import (
  "context"
  "net/http"

  "go.opentelemetry.io/otel"
  "go.opentelemetry.io/otel/propagation"
  "google.golang.org/grpc/metadata"
)

// InjectTraceToMap 将 ctx 里的 Trace 信息注入到 map 里,以传递给其它服务
func InjectTraceToMap(ctx context.Context, m map[string]any) {
  propagator := otel.GetTextMapPropagator()
  propagator.Inject(ctx, mapCarrier(m))
}

// ExtractTraceFromMap 从 map 里提取 Trace 信息,以恢复上下文
func ExtractTraceFromMap(ctx context.Context, m map[string]any) context.Context {
  propagator := otel.GetTextMapPropagator()
  return propagator.Extract(ctx, mapCarrier(m))
}

// InjectTraceToHeader 将 ctx 里的 Trace 信息注入到 http.Header 里,以传递给其它服务
func InjectTraceToHeader(ctx context.Context, h http.Header) {
  propagator := otel.GetTextMapPropagator()
  propagator.Inject(ctx, propagation.HeaderCarrier(h))
}

// ExtractTraceFromHeader 从 http.Header 里提取 Trace 信息,以恢复上下文
func ExtractTraceFromHeader(ctx context.Context, h http.Header) context.Context {
  propagator := otel.GetTextMapPropagator()
  return propagator.Extract(ctx, propagation.HeaderCarrier(h))
}

// InjectTraceToGrpcMetadata 将 ctx 里的 Trace 信息注入到 grpc metadata 里,以传递给其它服务
//
// 如何将 metadata 注入 ctx: ctx := metadata.NewOutgoingContext(ctx, md)
func InjectTraceToGrpcMetadata(ctx context.Context, md metadata.MD) {
  propagator := otel.GetTextMapPropagator()
  propagator.Inject(ctx, grpcMetadataCarrier{})
}

// ExtractTraceFromGrpcMetadata 从 grpc metadata 里提取 Trace 信息,以恢复上下文
//
// 如何从 ctx 里获取 metadata:md, ok := metadata.FromIncomingContext(ctx)
func ExtractTraceFromGrpcMetadata(ctx context.Context, md metadata.MD) context.Context {
  propagator := otel.GetTextMapPropagator()
  return propagator.Extract(ctx, grpcMetadataCarrier(md))
}
// pkg/common/trace/textcarriers.go
// 这个文件使用不同的私有类型实现了 propagation.TextMapCarrier 接口,目的是让 propagators 可以从不同的结构体里存、取 trace 信息。

package trace

import (
  "golang.org/x/exp/maps"
  "google.golang.org/grpc/metadata"
)

type mapCarrier map[string]any

// Get implements propagation.TextMapCarrier.
func (m mapCarrier) Get(key string) string {
  if v, ok := m[key]; ok {
    return v.(string)
  }
  return ""
}

// Keys implements propagation.TextMapCarrier.
func (m mapCarrier) Keys() []string {
  return maps.Keys(m)
}

// Set implements propagation.TextMapCarrier.
func (m mapCarrier) Set(key string, value string) {
  m[key] = value
}

type grpcMetadataCarrier metadata.MD

// Get implements propagation.TextMapCarrier.
func (m grpcMetadataCarrier) Get(key string) string {
  values := metadata.MD(m).Get(key)
  if len(values) > 0 {
    return values[0]
  }
  return ""
}

// Keys implements propagation.TextMapCarrier.
func (m grpcMetadataCarrier) Keys() []string {
  return maps.Keys(metadata.MD(m))
}

// Set implements propagation.TextMapCarrier.
func (m grpcMetadataCarrier) Set(key string, value string) {
  metadata.MD(m).Set(key, value)
}