From 498778b0412e97402b954818a94c9ed45548fc0f Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Fri, 22 Aug 2025 11:56:56 +0200 Subject: [PATCH 01/12] feat(log): added gRPC access logs --- go.mod | 2 +- log/examples_test.go | 45 +++ log/grpc_access_logger.go | 168 +++++++++++ log/grpc_access_logger_fields.go | 32 +++ log/grpc_access_logger_options.go | 58 ++++ log/grpc_access_logger_test.go | 269 ++++++++++++++++++ ...access_logger.go => http_access_logger.go} | 0 ...fields.go => http_access_logger_fields.go} | 0 ...t.go => http_access_logger_go1_20_test.go} | 0 ...tions.go => http_access_logger_options.go} | 0 ...ger_test.go => http_access_logger_test.go} | 0 11 files changed, 573 insertions(+), 1 deletion(-) create mode 100644 log/grpc_access_logger.go create mode 100644 log/grpc_access_logger_fields.go create mode 100644 log/grpc_access_logger_options.go create mode 100644 log/grpc_access_logger_test.go rename log/{access_logger.go => http_access_logger.go} (100%) rename log/{access_logger_fields.go => http_access_logger_fields.go} (100%) rename log/{access_logger_go1_20_test.go => http_access_logger_go1_20_test.go} (100%) rename log/{access_logger_options.go => http_access_logger_options.go} (100%) rename log/{access_logger_test.go => http_access_logger_test.go} (100%) diff --git a/go.mod b/go.mod index 080630c1..5a6380f9 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( golang.org/x/crypto v0.41.0 google.golang.org/api v0.54.0 google.golang.org/grpc v1.40.0 + google.golang.org/protobuf v1.34.2 gopkg.in/DataDog/dd-trace-go.v1 v1.32.0 ) @@ -74,6 +75,5 @@ require ( golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20210813162853-db860fec028c // indirect - google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/log/examples_test.go b/log/examples_test.go index 2786dcc6..86a076af 100644 --- a/log/examples_test.go +++ b/log/examples_test.go @@ -1,10 +1,13 @@ package log_test import ( + "context" "fmt" "net/http" "gitlab.com/gitlab-org/labkit/log" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) func ExampleInitialize() { @@ -48,3 +51,45 @@ func ExampleAccessLogger() { ), ) } + +func ExampleUnaryServerInterceptor() { + // This func is used by WithGrpcExtraFields to add additional fields to the logger + extraFieldGenerator := func(ctx context.Context, fullMethodName string) log.Fields { + if md, ok := metadata.FromIncomingContext(ctx); ok { + if magicalHeader := md.Get("X-Magical-Header"); len(magicalHeader) > 0 { + return log.Fields{"header": magicalHeader[0]} + } + } + return nil + } + + grpc.NewServer( + grpc.ChainUnaryInterceptor( + log.UnaryServerInterceptor( + log.WithGrpcExtraFields(extraFieldGenerator), + log.WithGrpcFieldsExcluded(log.GrpcPeerAddress), + ), + ), + ) +} + +func ExampleStreamServerInterceptor() { + // This func is used by WithGrpcExtraFields to add additional fields to the logger + extraFieldGenerator := func(ctx context.Context, fullMethodName string) log.Fields { + if md, ok := metadata.FromIncomingContext(ctx); ok { + if magicalHeader := md.Get("X-Magical-Header"); len(magicalHeader) > 0 { + return log.Fields{"header": magicalHeader[0]} + } + } + return nil + } + + grpc.NewServer( + grpc.ChainStreamInterceptor( + log.StreamServerInterceptor( + log.WithGrpcExtraFields(extraFieldGenerator), + log.WithGrpcFieldsExcluded(log.GrpcPeerAddress), + ), + ), + ) +} diff --git a/log/grpc_access_logger.go b/log/grpc_access_logger.go new file mode 100644 index 00000000..426f805d --- /dev/null +++ b/log/grpc_access_logger.go @@ -0,0 +1,168 @@ +package log + +import ( + "context" + "strings" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +// UnaryServerInterceptor returns a new unary server interceptor that logs gRPC requests. +func UnaryServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.UnaryServerInterceptor { + config := applyGrpcAccessLoggerOptions(opts) + + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + startTime := time.Now() + + resp, err := handler(ctx, req) + + duration := time.Since(startTime) + st := status.Convert(err) + fields := accessLogFields(ctx, info.FullMethod, duration, st, req, resp, &config) + + level := levelFromCode(st.Code()) + config.logger.WithFields(fields).Log(level, "access") + + return resp, err + } +} + +// StreamServerInterceptor returns a new stream server interceptor that logs gRPC requests. +func StreamServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.StreamServerInterceptor { + config := applyGrpcAccessLoggerOptions(opts) + + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + startTime := time.Now() + + err := handler(srv, ss) + + duration := time.Since(startTime) + st := status.Convert(err) + fields := accessLogFields(ss.Context(), info.FullMethod, duration, st, nil, nil, &config) + + level := levelFromCode(st.Code()) + config.logger.WithFields(fields).Log(level, "access") + + return err + } +} + +func levelFromCode(code codes.Code) logrus.Level { + switch code { + case codes.OK: + return logrus.InfoLevel + case codes.Canceled, + codes.InvalidArgument, + codes.DeadlineExceeded, + codes.NotFound, + codes.AlreadyExists, + codes.PermissionDenied, + codes.ResourceExhausted, + codes.FailedPrecondition, + codes.Aborted, + codes.OutOfRange, + codes.Unimplemented: + return logrus.WarnLevel + case codes.Unknown, + codes.Internal, + codes.Unavailable, + codes.DataLoss, + codes.Unauthenticated: + return logrus.ErrorLevel + default: + return logrus.ErrorLevel + } +} + +// parseGRPCMethod safely extracts service and method from full method name. +func parseGRPCMethod(fullMethod string) (service, method string) { + if fullMethod == "" { + return "", "" + } + + fullMethod = strings.TrimPrefix(fullMethod, "/") + + parts := strings.SplitN(fullMethod, "/", 2) + + switch len(parts) { + case 2: + service = parts[0] + method = parts[1] + + method = strings.TrimSuffix(method, "/") + + case 1: + + method = parts[0] + + default: + return "", "" + } + + return service, method +} + +func accessLogFields( + ctx context.Context, fullMethod string, duration time.Duration, + st *status.Status, req, resp any, config *grpcAccessLoggerConfig, +) logrus.Fields { + fields := config.extraFields(ctx, fullMethod) + fieldsBitMask := config.fields + + if fieldsBitMask&GrpcCorrelationID != 0 { + fields[correlation.FieldName] = correlation.ExtractFromContext(ctx) + } + + service, method := parseGRPCMethod(fullMethod) + + if fieldsBitMask&GrpcService != 0 { + fields[grpcServiceField] = service + } + + if fieldsBitMask&GrpcMethod != 0 { + fields[grpcMethodField] = method + } + + if fieldsBitMask&GrpcStatus != 0 { + fields[grpcStatusField] = st.Code().String() + } + + if fieldsBitMask&GrpcPeerAddress != 0 { + if p, ok := peer.FromContext(ctx); ok { + fields[grpcPeerAddressField] = p.Addr.String() + } + } + + if fieldsBitMask&GrpcRequestDuration != 0 { + fields[grpcRequestDurationField] = duration.Milliseconds() + } + + if fieldsBitMask&GrpcSystem != 0 { + fields[grpcSystemField] = "grpc" + } + + if fieldsBitMask&GrpcRequestSize != 0 { + if req != nil { + if p, ok := req.(proto.Message); ok { + fields[grpcRequestSizeField] = proto.Size(p) + } + } + } + + if fieldsBitMask&GrpcResponseSize != 0 { + if resp != nil { + if p, ok := resp.(proto.Message); ok { + fields[grpcResponseSizeField] = proto.Size(p) + } + } + } + + return fields +} diff --git a/log/grpc_access_logger_fields.go b/log/grpc_access_logger_fields.go new file mode 100644 index 00000000..61455c48 --- /dev/null +++ b/log/grpc_access_logger_fields.go @@ -0,0 +1,32 @@ +package log + +// GrpcAccessLogField is used to select which fields are recorded in the access log. See WithGrpcFieldsExcluded. +type GrpcAccessLogField uint16 + +// GrpcAccessLogField defines bit flags for configuring which fields to include in gRPC access logs +// Combine multiple fields using bitwise OR: logConfig := GrpcCorrelationID | GrpcService | GrpcStatus +// Check if field is enabled using AND: if logConfig & GrpcService != 0 { ... }. +const ( + GrpcCorrelationID GrpcAccessLogField = 1 << iota + GrpcSystem + GrpcService + GrpcMethod + GrpcStatus + GrpcPeerAddress + GrpcRequestDuration + GrpcRequestSize + GrpcResponseSize +) + +const defaultGrpcEnabledFields = ^GrpcAccessLogField(0) // By default, all fields are enabled + +const ( + grpcServiceField = "grpc.service" + grpcMethodField = "grpc.method" + grpcStatusField = "grpc.code" + grpcPeerAddressField = "grpc.peer.address" + grpcRequestDurationField = "duration_ms" + grpcSystemField = "system" + grpcRequestSizeField = "grpc.request.size" + grpcResponseSizeField = "grpc.response.size" +) diff --git a/log/grpc_access_logger_options.go b/log/grpc_access_logger_options.go new file mode 100644 index 00000000..6e09b9ba --- /dev/null +++ b/log/grpc_access_logger_options.go @@ -0,0 +1,58 @@ +package log + +import ( + "context" + + "github.com/sirupsen/logrus" +) + +// GrpcExtraFieldsGeneratorFunc allows extra fields to be included in the access log. +type GrpcExtraFieldsGeneratorFunc func(ctx context.Context, fullMethodName string) Fields + +// The configuration for a gRPC access logger. +type grpcAccessLoggerConfig struct { + logger *logrus.Logger + extraFields GrpcExtraFieldsGeneratorFunc + fields GrpcAccessLogField +} + +func nullGrpcExtraFieldsGenerator(ctx context.Context, fullMethodName string) Fields { + return Fields{} +} + +// GrpcAccessLoggerOption will configure a access logger handler. +type GrpcAccessLoggerOption func(*grpcAccessLoggerConfig) + +func applyGrpcAccessLoggerOptions(opts []GrpcAccessLoggerOption) grpcAccessLoggerConfig { + config := grpcAccessLoggerConfig{ + logger: logger, + extraFields: nullGrpcExtraFieldsGenerator, + fields: defaultGrpcEnabledFields, + } + for _, v := range opts { + v(&config) + } + + return config +} + +// WithGrpcExtraFields allows extra fields to be passed into the access logger, based on the request. +func WithGrpcExtraFields(f GrpcExtraFieldsGeneratorFunc) GrpcAccessLoggerOption { + return func(config *grpcAccessLoggerConfig) { + config.extraFields = f + } +} + +// WithGrpcFieldsExcluded allows fields to be excluded from the access log. +func WithGrpcFieldsExcluded(fields GrpcAccessLogField) GrpcAccessLoggerOption { + return func(config *grpcAccessLoggerConfig) { + config.fields &^= fields + } +} + +// WithGrpcAccessLogger configures the logger to be used with the access logger. +func WithGrpcAccessLogger(logger *logrus.Logger) GrpcAccessLoggerOption { + return func(config *grpcAccessLoggerConfig) { + config.logger = logger + } +} diff --git a/log/grpc_access_logger_test.go b/log/grpc_access_logger_test.go new file mode 100644 index 00000000..fcedbdf1 --- /dev/null +++ b/log/grpc_access_logger_test.go @@ -0,0 +1,269 @@ +package log + +import ( + "bytes" + "context" + "io" + "net" + "strings" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/status" +) + +const ( + grpcTestReqSize = 10 + grpcTestRespSize = 20 +) + +func TestUnaryServerInterceptor(t *testing.T) { + tests := []struct { + name string + logMatchers []string + opts []GrpcAccessLoggerOption + handler func(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) + }{ + { + name: "trivial", + logMatchers: []string{ + `"grpc.service":"grpc.testing.TestService"`, + `"grpc.method":"UnaryCall"`, + `"grpc.code":"OK"`, + `"grpc.peer.address":".*"`, + `"duration_ms":\d+`, + `"system":"grpc"`, + `"correlation_id":".*"`, + `"grpc.request.size":\d+`, + `"grpc.response.size":\d+`, + }, + }, + { + name: "error", + handler: func(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + return nil, status.Error(codes.NotFound, "not found") + }, + logMatchers: []string{ + `"grpc.code":"NotFound"`, + }, + }, + { + name: "extra_fields", + opts: []GrpcAccessLoggerOption{ + WithGrpcExtraFields(func(ctx context.Context, method string) logrus.Fields { + return logrus.Fields{"testfield": "testvalue"} + }), + }, + logMatchers: []string{ + `"testfield":"testvalue"`, + }, + }, + { + name: "excluded_fields", + opts: []GrpcAccessLoggerOption{ + WithGrpcFieldsExcluded(GrpcService | GrpcMethod | GrpcRequestSize | GrpcResponseSize), + }, + logMatchers: []string{ + `"grpc.code":"OK"`, + `"grpc.peer.address":".*"`, + `"duration_ms":\d+`, + `"system":"grpc"`, + `"correlation_id":".*"`, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + buf := &bytes.Buffer{} + logger := newLogger(buf) + opts := tt.opts + opts = append(opts, WithGrpcAccessLogger(logger)) + + handler := tt.handler + if handler == nil { + handler = func(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + return &grpc_testing.SimpleResponse{Payload: &grpc_testing.Payload{Body: make([]byte, grpcTestRespSize)}}, nil + } + } + + server := newTestGrpcServer(UnaryServerInterceptor(opts...)) + server.service.UnaryCallF = handler + + conn := newTestGrpcClient(t, server.listener.Addr().String()) + client := grpc_testing.NewTestServiceClient(conn) + + ctx := correlation.ContextWithCorrelation(context.Background(), "test-correlation-id") + _, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{ + Payload: &grpc_testing.Payload{Body: make([]byte, grpcTestReqSize)}, + }) + + if strings.Contains(tt.name, "error") { + require.Error(t, err) + s := status.Convert(err) + require.Equal(t, codes.NotFound, s.Code()) + } else { + require.NoError(t, err) + } + + logString := buf.String() + require.Contains(t, logString, `"msg":"access"`) + for _, v := range tt.logMatchers { + require.Regexp(t, v, logString) + } + + if strings.Contains(tt.name, "excluded_fields") { + require.NotContains(t, logString, `"grpc.service"`) + require.NotContains(t, logString, `"grpc.method"`) + require.NotContains(t, logString, `"grpc.request.size"`) + require.NotContains(t, logString, `"grpc.response.size"`) + } + }) + } +} + +func TestStreamServerInterceptor(t *testing.T) { + tests := []struct { + name string + logMatchers []string + opts []GrpcAccessLoggerOption + handler func(stream grpc_testing.TestService_FullDuplexCallServer) error + }{ + { + name: "trivial", + logMatchers: []string{ + `"grpc.service":"grpc.testing.TestService"`, + `"grpc.method":"FullDuplexCall"`, + `"grpc.code":"OK"`, + `"grpc.peer.address":".*"`, + `"duration_ms":\d+`, + `"system":"grpc"`, + `"correlation_id":".*"`, + }, + }, + { + name: "error", + handler: func(stream grpc_testing.TestService_FullDuplexCallServer) error { + return status.Error(codes.NotFound, "not found") + }, + logMatchers: []string{ + `"grpc.code":"NotFound"`, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + buf := &bytes.Buffer{} + logger := newLogger(buf) + opts := tt.opts + opts = append(opts, WithGrpcAccessLogger(logger)) + + handler := tt.handler + if handler == nil { + handler = func(stream grpc_testing.TestService_FullDuplexCallServer) error { + return nil + } + } + + server := newTestGrpcServer(nil, StreamServerInterceptor(opts...)) + server.service.FullDuplexCallF = handler + + conn := newTestGrpcClient(t, server.listener.Addr().String()) + client := grpc_testing.NewTestServiceClient(conn) + + ctx := correlation.ContextWithCorrelation(context.Background(), "test-correlation-id") + stream, err := client.FullDuplexCall(ctx) + require.NoError(t, err) + + // We need to receive to ensure the stream handler has completed + _, err = stream.Recv() + if strings.Contains(tt.name, "error") { + require.Error(t, err) + s, _ := status.FromError(err) + require.Equal(t, codes.NotFound, s.Code()) + } else { + require.ErrorIs(t, err, io.EOF) + } + + logString := buf.String() + require.Contains(t, logString, `"msg":"access"`) + for _, v := range tt.logMatchers { + require.Regexp(t, v, logString) + } + }) + } +} + +func newLogger(w io.Writer) *logrus.Logger { + logger := logrus.New() + logger.Out = w + logger.Formatter = &logrus.JSONFormatter{} + return logger +} + +type testService struct { + grpc_testing.UnimplementedTestServiceServer + UnaryCallF func(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) + FullDuplexCallF func(stream grpc_testing.TestService_FullDuplexCallServer) error +} + +func (s *testService) UnaryCall(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + if s.UnaryCallF != nil { + return s.UnaryCallF(ctx, req) + } + return &grpc_testing.SimpleResponse{}, nil +} + +func (s *testService) FullDuplexCall(stream grpc_testing.TestService_FullDuplexCallServer) error { + if s.FullDuplexCallF != nil { + return s.FullDuplexCallF(stream) + } + return nil +} + +type testGrpcServer struct { + listener net.Listener + server *grpc.Server + service *testService +} + +func newTestGrpcServer(unaryInterceptor grpc.UnaryServerInterceptor, streamInterceptor ...grpc.StreamServerInterceptor) *testGrpcServer { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + + service := &testService{} + opts := []grpc.ServerOption{} + if unaryInterceptor != nil { + opts = append(opts, grpc.UnaryInterceptor(unaryInterceptor)) + } + if len(streamInterceptor) > 0 { + opts = append(opts, grpc.StreamInterceptor(streamInterceptor[0])) + } + + server := grpc.NewServer(opts...) + grpc_testing.RegisterTestServiceServer(server, service) + + go server.Serve(listener) + + return &testGrpcServer{ + listener: listener, + server: server, + service: service, + } +} + +func newTestGrpcClient(t *testing.T, addr string) *grpc.ClientConn { + t.Helper() + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + return conn +} diff --git a/log/access_logger.go b/log/http_access_logger.go similarity index 100% rename from log/access_logger.go rename to log/http_access_logger.go diff --git a/log/access_logger_fields.go b/log/http_access_logger_fields.go similarity index 100% rename from log/access_logger_fields.go rename to log/http_access_logger_fields.go diff --git a/log/access_logger_go1_20_test.go b/log/http_access_logger_go1_20_test.go similarity index 100% rename from log/access_logger_go1_20_test.go rename to log/http_access_logger_go1_20_test.go diff --git a/log/access_logger_options.go b/log/http_access_logger_options.go similarity index 100% rename from log/access_logger_options.go rename to log/http_access_logger_options.go diff --git a/log/access_logger_test.go b/log/http_access_logger_test.go similarity index 100% rename from log/access_logger_test.go rename to log/http_access_logger_test.go -- GitLab From edbb586564289acf7ac8e5915033ce95efd13efa Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Tue, 16 Sep 2025 13:45:00 +0200 Subject: [PATCH 02/12] refacroted to use standard lib --- go.mod | 14 +- go.sum | 26 ++-- log/grpc_access_logger.go | 227 +++++++++++++----------------- log/grpc_access_logger_fields.go | 32 ++--- log/grpc_access_logger_options.go | 36 +++-- 5 files changed, 153 insertions(+), 182 deletions(-) diff --git a/go.mod b/go.mod index 5a6380f9..121a0e9f 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/getsentry/raven-go v0.2.0 github.com/getsentry/sentry-go v0.13.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 + github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 github.com/lightstep/lightstep-tracer-go v0.25.0 github.com/oklog/ulid/v2 v2.0.2 github.com/opentracing/opentracing-go v1.2.0 @@ -15,14 +16,13 @@ require ( github.com/prometheus/client_model v0.6.1 github.com/sebest/xff v0.0.0-20210106013422-671bd2870b3a github.com/sirupsen/logrus v1.9.3 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/uber/jaeger-client-go v2.29.1+incompatible gitlab.com/gitlab-org/go/reopen v1.0.0 go.opencensus.io v0.23.0 golang.org/x/crypto v0.41.0 google.golang.org/api v0.54.0 - google.golang.org/grpc v1.40.0 - google.golang.org/protobuf v1.34.2 + google.golang.org/grpc v1.67.1 gopkg.in/DataDog/dd-trace-go.v1 v1.32.0 ) @@ -36,7 +36,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/aws/aws-sdk-go v1.37.0 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect + github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -44,9 +44,8 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20210804190019-f964ff605595 // indirect - github.com/google/uuid v1.1.2 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/googleapis/gax-go/v2 v2.0.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.17.9 // indirect @@ -74,6 +73,7 @@ require ( golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20210813162853-db860fec028c // indirect + google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc // indirect + google.golang.org/protobuf v1.36.6 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8c933563..7653da72 100644 --- a/go.sum +++ b/go.sum @@ -61,7 +61,6 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.1 h1:cJXY5VLMHgejurPjZH6Fo9rIwRGLef github.com/HdrHistogram/hdrhistogram-go v1.1.1/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/Microsoft/go-winio v0.5.0 h1:Elr9Wn+sGKPlkaBvwu4mTrxtmOp3F3yV9qhaHbXGjwU= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= -github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= @@ -71,11 +70,11 @@ github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zK github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= +github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d h1:S2NE3iHSwP0XV47EEXL8mWmRdEfGscSJ+7EgePNgt0s= github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= -github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -193,13 +192,16 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210804190019-f964ff605595 h1:uNrRgpnKjTfxu4qHaZAAs3eKTYV1EzGF3dAykpnxgDE= github.com/google/pprof v0.0.0-20210804190019-f964ff605595/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= -github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 h1:sGm2vDRFUrQJO/Veii4h4zG2vvqG6uWNkBHSTqXOZk0= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2/go.mod h1:wd1YpapPLivG6nQgbf7ZkG1hhSOXDhhn4MLTknx2aAc= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -280,7 +282,6 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= @@ -291,8 +292,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tinylib/msgp v1.1.2 h1:gWmO7n0Ys2RBEb7GPYB9Ujq8Mk5p2U08lRnmMcGy6BQ= github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tklauser/go-sysconf v0.3.4 h1:HT8SVixZd3IzLdfs/xlpq0jeSfTX57g1v6wB1EuzV7M= @@ -662,8 +663,9 @@ google.golang.org/genproto v0.0.0-20210713002101-d411969a0d9a/go.mod h1:AxrInvYm google.golang.org/genproto v0.0.0-20210716133855-ce7ef5c701ea/go.mod h1:AxrInvYm1dci+enl5hChSFPOmmUF1+uAa/UsgNRWd7k= google.golang.org/genproto v0.0.0-20210728212813-7823e685a01f/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= -google.golang.org/genproto v0.0.0-20210813162853-db860fec028c h1:iLQakcwWG3k/++1q/46apVb1sUQ3IqIdn9yUE6eh/xA= google.golang.org/genproto v0.0.0-20210813162853-db860fec028c/go.mod h1:cFeNkxwySK631ADgubI+/XFU/xp8FD5KIVV4rj8UC5w= +google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc h1:Nf+EdcTLHR8qDNN/KfkQL0u0ssxt9OhbaWCl5C0ucEI= +google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -689,8 +691,8 @@ google.golang.org/grpc v1.37.1/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= -google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q= -google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -705,8 +707,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/DataDog/dd-trace-go.v1 v1.32.0 h1:DkD0plWEVUB8v/Ru6kRBW30Hy/fRNBC8hPdcExuBZMc= gopkg.in/DataDog/dd-trace-go.v1 v1.32.0/go.mod h1:wRKMf/tRASHwH/UOfPQ3IQmVFhTz2/1a1/mpXoIjF54= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/log/grpc_access_logger.go b/log/grpc_access_logger.go index 426f805d..c64c501a 100644 --- a/log/grpc_access_logger.go +++ b/log/grpc_access_logger.go @@ -2,167 +2,140 @@ package log import ( "context" - "strings" - "time" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/peer" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" ) -// UnaryServerInterceptor returns a new unary server interceptor that logs gRPC requests. func UnaryServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.UnaryServerInterceptor { config := applyGrpcAccessLoggerOptions(opts) - return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - startTime := time.Now() - - resp, err := handler(ctx, req) - - duration := time.Since(startTime) - st := status.Convert(err) - fields := accessLogFields(ctx, info.FullMethod, duration, st, req, resp, &config) - - level := levelFromCode(st.Code()) - config.logger.WithFields(fields).Log(level, "access") - - return resp, err - } + return logging.UnaryServerInterceptor( + InterceptorLogger(config.logger), + logging.WithLogOnEvents(logging.FinishCall), + logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields { + return extractLabkitFields(ctx, &config) + }), + logging.WithLevels(levelFromCode), + ) } -// StreamServerInterceptor returns a new stream server interceptor that logs gRPC requests. func StreamServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.StreamServerInterceptor { config := applyGrpcAccessLoggerOptions(opts) - return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - startTime := time.Now() - - err := handler(srv, ss) - - duration := time.Since(startTime) - st := status.Convert(err) - fields := accessLogFields(ss.Context(), info.FullMethod, duration, st, nil, nil, &config) - - level := levelFromCode(st.Code()) - config.logger.WithFields(fields).Log(level, "access") - - return err - } + return logging.StreamServerInterceptor( + InterceptorLogger(config.logger), + logging.WithLogOnEvents(logging.FinishCall), + logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields { + return extractLabkitFields(ctx, &config) + }), + logging.WithLevels(levelFromCode), + ) } -func levelFromCode(code codes.Code) logrus.Level { - switch code { - case codes.OK: - return logrus.InfoLevel - case codes.Canceled, - codes.InvalidArgument, - codes.DeadlineExceeded, - codes.NotFound, - codes.AlreadyExists, - codes.PermissionDenied, - codes.ResourceExhausted, - codes.FailedPrecondition, - codes.Aborted, - codes.OutOfRange, - codes.Unimplemented: - return logrus.WarnLevel - case codes.Unknown, - codes.Internal, - codes.Unavailable, - codes.DataLoss, - codes.Unauthenticated: - return logrus.ErrorLevel - default: - return logrus.ErrorLevel - } -} - -// parseGRPCMethod safely extracts service and method from full method name. -func parseGRPCMethod(fullMethod string) (service, method string) { - if fullMethod == "" { - return "", "" - } - - fullMethod = strings.TrimPrefix(fullMethod, "/") - - parts := strings.SplitN(fullMethod, "/", 2) +func InterceptorLogger(l *logrus.Logger) logging.Logger { + return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) { + logrusFields := make(logrus.Fields) - switch len(parts) { - case 2: - service = parts[0] - method = parts[1] - - method = strings.TrimSuffix(method, "/") - - case 1: - - method = parts[0] + // Convert fields to logrus format + for i := 0; i < len(fields); i += 2 { + if i+1 < len(fields) { + if key, ok := fields[i].(string); ok { + logrusFields[key] = fields[i+1] + } + } + } - default: - return "", "" - } + // Convert logging.Level to logrus.Level + var logrusLevel logrus.Level + switch lvl { + case logging.LevelDebug: + logrusLevel = logrus.DebugLevel + case logging.LevelInfo: + logrusLevel = logrus.InfoLevel + case logging.LevelWarn: + logrusLevel = logrus.WarnLevel + case logging.LevelError: + logrusLevel = logrus.ErrorLevel + default: + logrusLevel = logrus.InfoLevel + } - return service, method + l.WithFields(logrusFields).Log(logrusLevel, msg) + }) } -func accessLogFields( - ctx context.Context, fullMethod string, duration time.Duration, - st *status.Status, req, resp any, config *grpcAccessLoggerConfig, -) logrus.Fields { - fields := config.extraFields(ctx, fullMethod) +func extractLabkitFields(ctx context.Context, config *grpcAccessLoggerConfig) logging.Fields { + var fields []any fieldsBitMask := config.fields + // Add correlation ID if enabled if fieldsBitMask&GrpcCorrelationID != 0 { - fields[correlation.FieldName] = correlation.ExtractFromContext(ctx) - } - - service, method := parseGRPCMethod(fullMethod) - - if fieldsBitMask&GrpcService != 0 { - fields[grpcServiceField] = service - } - - if fieldsBitMask&GrpcMethod != 0 { - fields[grpcMethodField] = method - } - - if fieldsBitMask&GrpcStatus != 0 { - fields[grpcStatusField] = st.Code().String() - } - - if fieldsBitMask&GrpcPeerAddress != 0 { - if p, ok := peer.FromContext(ctx); ok { - fields[grpcPeerAddressField] = p.Addr.String() + if corrID := correlation.ExtractFromContext(ctx); corrID != "" { + fields = append(fields, correlation.FieldName, corrID) } } - if fieldsBitMask&GrpcRequestDuration != 0 { - fields[grpcRequestDurationField] = duration.Milliseconds() - } - + // Add system field if enabled if fieldsBitMask&GrpcSystem != 0 { - fields[grpcSystemField] = "grpc" - } - - if fieldsBitMask&GrpcRequestSize != 0 { - if req != nil { - if p, ok := req.(proto.Message); ok { - fields[grpcRequestSizeField] = proto.Size(p) - } - } + fields = append(fields, "system", "grpc") } - if fieldsBitMask&GrpcResponseSize != 0 { - if resp != nil { - if p, ok := resp.(proto.Message); ok { - fields[grpcResponseSizeField] = proto.Size(p) + // Add extra fields from user configuration + if config.extraFields != nil { + if extraFields := config.extraFields(ctx, ""); extraFields != nil { + for k, v := range extraFields { + fields = append(fields, k, v) } } } return fields } + +// func extractLabkitFields(ctx context.Context, config *grpcAccessLoggerConfig) logging.Fields { +// fields := make(logging.Fields) +// fieldsBitMask := config.fields +// +// // Add correlation ID if enabled +// if fieldsBitMask&GrpcCorrelationID != 0 { +// if corrID := correlation.ExtractFromContext(ctx); corrID != "" { +// fields[correlation.FieldName] = corrID +// } +// } +// +// // Add system field if enabled +// if fieldsBitMask&GrpcSystem != 0 { +// fields["system"] = "grpc" +// } +// +// // Add extra fields from user configuration +// if config.extraFields != nil { +// if extraFields := config.extraFields(ctx, ""); extraFields != nil { +// for k, v := range extraFields { +// fields[k] = v +// } +// } +// } +// +// return fields +// } +func levelFromCode(c codes.Code) logging.Level { + switch c { + case codes.OK: + return logging.LevelInfo + case codes.Canceled, codes.InvalidArgument, codes.DeadlineExceeded, + codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, + codes.ResourceExhausted, codes.FailedPrecondition, codes.Aborted, + codes.OutOfRange, codes.Unimplemented: + return logging.LevelWarn + case codes.Unknown, codes.Internal, codes.Unavailable, + codes.DataLoss, codes.Unauthenticated: + return logging.LevelError + default: + return logging.LevelError + } +} diff --git a/log/grpc_access_logger_fields.go b/log/grpc_access_logger_fields.go index 61455c48..754160a5 100644 --- a/log/grpc_access_logger_fields.go +++ b/log/grpc_access_logger_fields.go @@ -1,32 +1,22 @@ package log -// GrpcAccessLogField is used to select which fields are recorded in the access log. See WithGrpcFieldsExcluded. +// GrpcAccessLogField is used to select which fields are recorded in the access log type GrpcAccessLogField uint16 // GrpcAccessLogField defines bit flags for configuring which fields to include in gRPC access logs // Combine multiple fields using bitwise OR: logConfig := GrpcCorrelationID | GrpcService | GrpcStatus -// Check if field is enabled using AND: if logConfig & GrpcService != 0 { ... }. +// Check if field is enabled using AND: if logConfig & GrpcService != 0 { ... } const ( GrpcCorrelationID GrpcAccessLogField = 1 << iota GrpcSystem - GrpcService - GrpcMethod - GrpcStatus - GrpcPeerAddress - GrpcRequestDuration - GrpcRequestSize - GrpcResponseSize + GrpcService // Will be handled by middleware automatically + GrpcMethod // Will be handled by middleware automatically + GrpcStatus // Will be handled by middleware automatically + GrpcPeerAddress // Will be handled by middleware automatically + GrpcRequestDuration // Will be handled by middleware automatically + GrpcRequestSize // Will be handled by middleware automatically + GrpcResponseSize // Will be handled by middleware automatically ) -const defaultGrpcEnabledFields = ^GrpcAccessLogField(0) // By default, all fields are enabled - -const ( - grpcServiceField = "grpc.service" - grpcMethodField = "grpc.method" - grpcStatusField = "grpc.code" - grpcPeerAddressField = "grpc.peer.address" - grpcRequestDurationField = "duration_ms" - grpcSystemField = "system" - grpcRequestSizeField = "grpc.request.size" - grpcResponseSizeField = "grpc.response.size" -) +// Default to only labkit-specific fields, let middleware handle standard gRPC fields +const defaultGrpcEnabledFields = GrpcCorrelationID | GrpcSystem diff --git a/log/grpc_access_logger_options.go b/log/grpc_access_logger_options.go index 6e09b9ba..0fd44c14 100644 --- a/log/grpc_access_logger_options.go +++ b/log/grpc_access_logger_options.go @@ -2,57 +2,63 @@ package log import ( "context" - + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/sirupsen/logrus" ) -// GrpcExtraFieldsGeneratorFunc allows extra fields to be included in the access log. +// GrpcExtraFieldsGeneratorFunc allows extra fields to be included in the access log type GrpcExtraFieldsGeneratorFunc func(ctx context.Context, fullMethodName string) Fields -// The configuration for a gRPC access logger. +// grpcAccessLoggerConfig holds the configuration for gRPC access logging type grpcAccessLoggerConfig struct { logger *logrus.Logger extraFields GrpcExtraFieldsGeneratorFunc fields GrpcAccessLogField + logEvents []logging.LoggableEvent // New: control when to log } -func nullGrpcExtraFieldsGenerator(ctx context.Context, fullMethodName string) Fields { - return Fields{} -} - -// GrpcAccessLoggerOption will configure a access logger handler. +// GrpcAccessLoggerOption configures the gRPC access logger type GrpcAccessLoggerOption func(*grpcAccessLoggerConfig) func applyGrpcAccessLoggerOptions(opts []GrpcAccessLoggerOption) grpcAccessLoggerConfig { config := grpcAccessLoggerConfig{ - logger: logger, - extraFields: nullGrpcExtraFieldsGenerator, + logger: logger, // default logger from labkit + extraFields: nil, fields: defaultGrpcEnabledFields, + logEvents: []logging.LoggableEvent{logging.FinishCall}, // Default: only log on finish } - for _, v := range opts { - v(&config) + + for _, opt := range opts { + opt(&config) } return config } -// WithGrpcExtraFields allows extra fields to be passed into the access logger, based on the request. +// WithGrpcExtraFields allows extra fields to be added to the access log func WithGrpcExtraFields(f GrpcExtraFieldsGeneratorFunc) GrpcAccessLoggerOption { return func(config *grpcAccessLoggerConfig) { config.extraFields = f } } -// WithGrpcFieldsExcluded allows fields to be excluded from the access log. +// WithGrpcFieldsExcluded excludes specific fields from the access log func WithGrpcFieldsExcluded(fields GrpcAccessLogField) GrpcAccessLoggerOption { return func(config *grpcAccessLoggerConfig) { config.fields &^= fields } } -// WithGrpcAccessLogger configures the logger to be used with the access logger. +// WithGrpcAccessLogger sets a custom logger for the access logger func WithGrpcAccessLogger(logger *logrus.Logger) GrpcAccessLoggerOption { return func(config *grpcAccessLoggerConfig) { config.logger = logger } } + +// WithGrpcLogEvents controls when to log (start, finish, or both) +func WithGrpcLogEvents(events ...logging.LoggableEvent) GrpcAccessLoggerOption { + return func(config *grpcAccessLoggerConfig) { + config.logEvents = events + } +} -- GitLab From 38983a6aff9213b438cd32636ea4d7637d3e00cb Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Tue, 16 Sep 2025 14:13:11 +0200 Subject: [PATCH 03/12] added sizes as default fields --- log/grpc_access_logger_fields.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/log/grpc_access_logger_fields.go b/log/grpc_access_logger_fields.go index 754160a5..be17e299 100644 --- a/log/grpc_access_logger_fields.go +++ b/log/grpc_access_logger_fields.go @@ -9,14 +9,14 @@ type GrpcAccessLogField uint16 const ( GrpcCorrelationID GrpcAccessLogField = 1 << iota GrpcSystem - GrpcService // Will be handled by middleware automatically - GrpcMethod // Will be handled by middleware automatically - GrpcStatus // Will be handled by middleware automatically - GrpcPeerAddress // Will be handled by middleware automatically - GrpcRequestDuration // Will be handled by middleware automatically - GrpcRequestSize // Will be handled by middleware automatically - GrpcResponseSize // Will be handled by middleware automatically + GrpcService + GrpcMethod + GrpcStatus + GrpcPeerAddress + GrpcRequestDuration + GrpcRequestSize + GrpcResponseSize ) // Default to only labkit-specific fields, let middleware handle standard gRPC fields -const defaultGrpcEnabledFields = GrpcCorrelationID | GrpcSystem +const defaultGrpcEnabledFields = GrpcCorrelationID | GrpcSystem | GrpcRequestSize | GrpcResponseSize -- GitLab From ad44cae00dcc94c6d004f1e6b9bd68d1008a85fe Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Tue, 16 Sep 2025 15:19:13 +0200 Subject: [PATCH 04/12] added reqsize --- log/grpc_access_logger.go | 42 ++++++++++++++------------------------- 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/log/grpc_access_logger.go b/log/grpc_access_logger.go index c64c501a..c285dce5 100644 --- a/log/grpc_access_logger.go +++ b/log/grpc_access_logger.go @@ -3,11 +3,13 @@ package log import ( "context" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/protobuf/proto" ) func UnaryServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.UnaryServerInterceptor { @@ -20,6 +22,19 @@ func UnaryServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.UnaryServerInte return extractLabkitFields(ctx, &config) }), logging.WithLevels(levelFromCode), + // Add custom payload size logging + logging.WithFieldsFromContextAndCallMeta(func(ctx context.Context, callMeta interceptors.CallMeta) logging.Fields { + var fields []any + + // Add request size if available + if callMeta.ReqOrNil != nil { + if msg, ok := callMeta.ReqOrNil.(proto.Message); ok { + fields = append(fields, "grpc.request.size", proto.Size(msg)) + } + } + + return fields + }), ) } @@ -96,33 +111,6 @@ func extractLabkitFields(ctx context.Context, config *grpcAccessLoggerConfig) lo return fields } -// func extractLabkitFields(ctx context.Context, config *grpcAccessLoggerConfig) logging.Fields { -// fields := make(logging.Fields) -// fieldsBitMask := config.fields -// -// // Add correlation ID if enabled -// if fieldsBitMask&GrpcCorrelationID != 0 { -// if corrID := correlation.ExtractFromContext(ctx); corrID != "" { -// fields[correlation.FieldName] = corrID -// } -// } -// -// // Add system field if enabled -// if fieldsBitMask&GrpcSystem != 0 { -// fields["system"] = "grpc" -// } -// -// // Add extra fields from user configuration -// if config.extraFields != nil { -// if extraFields := config.extraFields(ctx, ""); extraFields != nil { -// for k, v := range extraFields { -// fields[k] = v -// } -// } -// } -// -// return fields -// } func levelFromCode(c codes.Code) logging.Level { switch c { case codes.OK: -- GitLab From 241ff1e8d8e531bff8572aed10be821f609a1e80 Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Tue, 16 Sep 2025 17:15:30 +0200 Subject: [PATCH 05/12] use complex stuff --- log/grpc_access_logger.go | 249 +++++++++++++++++++++++++++++++++----- 1 file changed, 222 insertions(+), 27 deletions(-) diff --git a/log/grpc_access_logger.go b/log/grpc_access_logger.go index c285dce5..8d6814ff 100644 --- a/log/grpc_access_logger.go +++ b/log/grpc_access_logger.go @@ -3,52 +3,230 @@ package log import ( "context" - "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) +// contextKey for storing request/response sizes +type contextKeySizes struct{} + +// MessageSizes holds request and response sizes +type MessageSizes struct { + RequestSize int + ResponseSize int +} + func UnaryServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.UnaryServerInterceptor { config := applyGrpcAccessLoggerOptions(opts) - return logging.UnaryServerInterceptor( - InterceptorLogger(config.logger), - logging.WithLogOnEvents(logging.FinishCall), - logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields { - return extractLabkitFields(ctx, &config) - }), - logging.WithLevels(levelFromCode), - // Add custom payload size logging - logging.WithFieldsFromContextAndCallMeta(func(ctx context.Context, callMeta interceptors.CallMeta) logging.Fields { - var fields []any - - // Add request size if available - if callMeta.ReqOrNil != nil { - if msg, ok := callMeta.ReqOrNil.(proto.Message); ok { - fields = append(fields, "grpc.request.size", proto.Size(msg)) + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + // Calculate request size + var requestSize int + if msg, ok := req.(proto.Message); ok { + requestSize = proto.Size(msg) + } + + // Call the actual handler + resp, err := handler(ctx, req) + + // Calculate response size + var responseSize int + if resp != nil { + if msg, ok := resp.(proto.Message); ok { + responseSize = proto.Size(msg) + } + } + + // Extract status code + code := codes.OK + if err != nil { + if st, ok := status.FromError(err); ok { + code = st.Code() + } + } + + // Log with sizes + logLevel := getLogLevel(code) + fields := logrus.Fields{ + "grpc.method": info.FullMethod, + "grpc.code": code.String(), + "grpc.request.size": requestSize, + "grpc.response.size": responseSize, + } + + // Add labkit fields + if config.fields&GrpcCorrelationID != 0 { + if corrID := correlation.ExtractFromContext(ctx); corrID != "" { + fields[correlation.FieldName] = corrID + } + } + + if config.fields&GrpcSystem != 0 { + fields["system"] = "grpc" + } + + // Add extra fields from user configuration + if config.extraFields != nil { + if extraFields := config.extraFields(ctx, info.FullMethod); extraFields != nil { + for k, v := range extraFields { + fields[k] = v } } + } - return fields - }), - ) + config.logger.WithFields(fields).Log(logLevel, "finished unary call") + + return resp, err + } } +// StreamServerInterceptor with message counting and size tracking func StreamServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.StreamServerInterceptor { config := applyGrpcAccessLoggerOptions(opts) - return logging.StreamServerInterceptor( - InterceptorLogger(config.logger), - logging.WithLogOnEvents(logging.FinishCall), - logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields { - return extractLabkitFields(ctx, &config) - }), - logging.WithLevels(levelFromCode), - ) + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // Create a wrapped stream that tracks sizes + wrapped := &sizeTrackingServerStream{ + ServerStream: ss, + requestCount: 0, + responseCount: 0, + requestSize: 0, + responseSize: 0, + } + + // Call the actual handler + err := handler(srv, wrapped) + + // Extract status code + code := codes.OK + if err != nil { + if st, ok := status.FromError(err); ok { + code = st.Code() + } + } + + // Log with sizes + logLevel := getLogLevel(code) + fields := logrus.Fields{ + "grpc.method": info.FullMethod, + "grpc.code": code.String(), + "grpc.request.message_count": wrapped.requestCount, + "grpc.request.total_size": wrapped.requestSize, + "grpc.response.message_count": wrapped.responseCount, + "grpc.response.total_size": wrapped.responseSize, + } + + // Add labkit fields + ctx := ss.Context() + if config.fields&GrpcCorrelationID != 0 { + if corrID := correlation.ExtractFromContext(ctx); corrID != "" { + fields[correlation.FieldName] = corrID + } + } + + if config.fields&GrpcSystem != 0 { + fields["system"] = "grpc" + } + + // Add extra fields from user configuration + if config.extraFields != nil { + if extraFields := config.extraFields(ctx, info.FullMethod); extraFields != nil { + for k, v := range extraFields { + fields[k] = v + } + } + } + + config.logger.WithFields(fields).Log(logLevel, "finished streaming call") + + return err + } +} + +// sizeTrackingServerStream wraps grpc.ServerStream to track message sizes +type sizeTrackingServerStream struct { + grpc.ServerStream + requestCount int + responseCount int + requestSize int + responseSize int +} + +func (s *sizeTrackingServerStream) SendMsg(m any) error { + if msg, ok := m.(proto.Message); ok { + s.responseCount++ + s.responseSize += proto.Size(msg) + } + return s.ServerStream.SendMsg(m) +} + +func (s *sizeTrackingServerStream) RecvMsg(m any) error { + err := s.ServerStream.RecvMsg(m) + if err == nil { + if msg, ok := m.(proto.Message); ok { + s.requestCount++ + s.requestSize += proto.Size(msg) + } + } + return err +} + +// Alternative: If you still want to use the grpc-middleware logging package for consistency +func UnaryServerInterceptorWithLogging(opts ...GrpcAccessLoggerOption) grpc.UnaryServerInterceptor { + config := applyGrpcAccessLoggerOptions(opts) + + // Create a custom interceptor that wraps the logging middleware + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + // Store request size in context + var requestSize int + if msg, ok := req.(proto.Message); ok { + requestSize = proto.Size(msg) + } + + sizes := &MessageSizes{RequestSize: requestSize} + ctx = context.WithValue(ctx, contextKeySizes{}, sizes) + + // Wrap handler to capture response size + wrappedHandler := func(ctx context.Context, req any) (any, error) { + resp, err := handler(ctx, req) + + // Store response size + if sizes, ok := ctx.Value(contextKeySizes{}).(*MessageSizes); ok && resp != nil { + if msg, ok := resp.(proto.Message); ok { + sizes.ResponseSize = proto.Size(msg) + } + } + + return resp, err + } + + // Use the logging middleware with our wrapped handler + loggingInterceptor := logging.UnaryServerInterceptor( + InterceptorLogger(config.logger), + logging.WithLogOnEvents(logging.FinishCall), + logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields { + fields := extractLabkitFields(ctx, &config) + + // Add sizes from context + if sizes, ok := ctx.Value(contextKeySizes{}).(*MessageSizes); ok { + fields = append(fields, + "grpc.request.size", sizes.RequestSize, + "grpc.response.size", sizes.ResponseSize, + ) + } + + return fields + }), + logging.WithLevels(levelFromCode), + ) + + return loggingInterceptor(ctx, req, info, wrappedHandler) + } } func InterceptorLogger(l *logrus.Logger) logging.Logger { @@ -127,3 +305,20 @@ func levelFromCode(c codes.Code) logging.Level { return logging.LevelError } } + +func getLogLevel(c codes.Code) logrus.Level { + switch c { + case codes.OK: + return logrus.InfoLevel + case codes.Canceled, codes.InvalidArgument, codes.DeadlineExceeded, + codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, + codes.ResourceExhausted, codes.FailedPrecondition, codes.Aborted, + codes.OutOfRange, codes.Unimplemented: + return logrus.WarnLevel + case codes.Unknown, codes.Internal, codes.Unavailable, + codes.DataLoss, codes.Unauthenticated: + return logrus.ErrorLevel + default: + return logrus.ErrorLevel + } +} -- GitLab From 9af98b6e7841ac8ca5bb49e2f2c8a6d6dab8b8fb Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Tue, 16 Sep 2025 17:38:15 +0200 Subject: [PATCH 06/12] Revert "use complex stuff" This reverts commit 241ff1e8d8e531bff8572aed10be821f609a1e80. --- log/grpc_access_logger.go | 249 +++++--------------------------------- 1 file changed, 27 insertions(+), 222 deletions(-) diff --git a/log/grpc_access_logger.go b/log/grpc_access_logger.go index 8d6814ff..c285dce5 100644 --- a/log/grpc_access_logger.go +++ b/log/grpc_access_logger.go @@ -3,230 +3,52 @@ package log import ( "context" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) -// contextKey for storing request/response sizes -type contextKeySizes struct{} - -// MessageSizes holds request and response sizes -type MessageSizes struct { - RequestSize int - ResponseSize int -} - func UnaryServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.UnaryServerInterceptor { config := applyGrpcAccessLoggerOptions(opts) - return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - // Calculate request size - var requestSize int - if msg, ok := req.(proto.Message); ok { - requestSize = proto.Size(msg) - } - - // Call the actual handler - resp, err := handler(ctx, req) - - // Calculate response size - var responseSize int - if resp != nil { - if msg, ok := resp.(proto.Message); ok { - responseSize = proto.Size(msg) - } - } - - // Extract status code - code := codes.OK - if err != nil { - if st, ok := status.FromError(err); ok { - code = st.Code() - } - } - - // Log with sizes - logLevel := getLogLevel(code) - fields := logrus.Fields{ - "grpc.method": info.FullMethod, - "grpc.code": code.String(), - "grpc.request.size": requestSize, - "grpc.response.size": responseSize, - } - - // Add labkit fields - if config.fields&GrpcCorrelationID != 0 { - if corrID := correlation.ExtractFromContext(ctx); corrID != "" { - fields[correlation.FieldName] = corrID - } - } - - if config.fields&GrpcSystem != 0 { - fields["system"] = "grpc" - } - - // Add extra fields from user configuration - if config.extraFields != nil { - if extraFields := config.extraFields(ctx, info.FullMethod); extraFields != nil { - for k, v := range extraFields { - fields[k] = v + return logging.UnaryServerInterceptor( + InterceptorLogger(config.logger), + logging.WithLogOnEvents(logging.FinishCall), + logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields { + return extractLabkitFields(ctx, &config) + }), + logging.WithLevels(levelFromCode), + // Add custom payload size logging + logging.WithFieldsFromContextAndCallMeta(func(ctx context.Context, callMeta interceptors.CallMeta) logging.Fields { + var fields []any + + // Add request size if available + if callMeta.ReqOrNil != nil { + if msg, ok := callMeta.ReqOrNil.(proto.Message); ok { + fields = append(fields, "grpc.request.size", proto.Size(msg)) } } - } - config.logger.WithFields(fields).Log(logLevel, "finished unary call") - - return resp, err - } + return fields + }), + ) } -// StreamServerInterceptor with message counting and size tracking func StreamServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.StreamServerInterceptor { config := applyGrpcAccessLoggerOptions(opts) - return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - // Create a wrapped stream that tracks sizes - wrapped := &sizeTrackingServerStream{ - ServerStream: ss, - requestCount: 0, - responseCount: 0, - requestSize: 0, - responseSize: 0, - } - - // Call the actual handler - err := handler(srv, wrapped) - - // Extract status code - code := codes.OK - if err != nil { - if st, ok := status.FromError(err); ok { - code = st.Code() - } - } - - // Log with sizes - logLevel := getLogLevel(code) - fields := logrus.Fields{ - "grpc.method": info.FullMethod, - "grpc.code": code.String(), - "grpc.request.message_count": wrapped.requestCount, - "grpc.request.total_size": wrapped.requestSize, - "grpc.response.message_count": wrapped.responseCount, - "grpc.response.total_size": wrapped.responseSize, - } - - // Add labkit fields - ctx := ss.Context() - if config.fields&GrpcCorrelationID != 0 { - if corrID := correlation.ExtractFromContext(ctx); corrID != "" { - fields[correlation.FieldName] = corrID - } - } - - if config.fields&GrpcSystem != 0 { - fields["system"] = "grpc" - } - - // Add extra fields from user configuration - if config.extraFields != nil { - if extraFields := config.extraFields(ctx, info.FullMethod); extraFields != nil { - for k, v := range extraFields { - fields[k] = v - } - } - } - - config.logger.WithFields(fields).Log(logLevel, "finished streaming call") - - return err - } -} - -// sizeTrackingServerStream wraps grpc.ServerStream to track message sizes -type sizeTrackingServerStream struct { - grpc.ServerStream - requestCount int - responseCount int - requestSize int - responseSize int -} - -func (s *sizeTrackingServerStream) SendMsg(m any) error { - if msg, ok := m.(proto.Message); ok { - s.responseCount++ - s.responseSize += proto.Size(msg) - } - return s.ServerStream.SendMsg(m) -} - -func (s *sizeTrackingServerStream) RecvMsg(m any) error { - err := s.ServerStream.RecvMsg(m) - if err == nil { - if msg, ok := m.(proto.Message); ok { - s.requestCount++ - s.requestSize += proto.Size(msg) - } - } - return err -} - -// Alternative: If you still want to use the grpc-middleware logging package for consistency -func UnaryServerInterceptorWithLogging(opts ...GrpcAccessLoggerOption) grpc.UnaryServerInterceptor { - config := applyGrpcAccessLoggerOptions(opts) - - // Create a custom interceptor that wraps the logging middleware - return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - // Store request size in context - var requestSize int - if msg, ok := req.(proto.Message); ok { - requestSize = proto.Size(msg) - } - - sizes := &MessageSizes{RequestSize: requestSize} - ctx = context.WithValue(ctx, contextKeySizes{}, sizes) - - // Wrap handler to capture response size - wrappedHandler := func(ctx context.Context, req any) (any, error) { - resp, err := handler(ctx, req) - - // Store response size - if sizes, ok := ctx.Value(contextKeySizes{}).(*MessageSizes); ok && resp != nil { - if msg, ok := resp.(proto.Message); ok { - sizes.ResponseSize = proto.Size(msg) - } - } - - return resp, err - } - - // Use the logging middleware with our wrapped handler - loggingInterceptor := logging.UnaryServerInterceptor( - InterceptorLogger(config.logger), - logging.WithLogOnEvents(logging.FinishCall), - logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields { - fields := extractLabkitFields(ctx, &config) - - // Add sizes from context - if sizes, ok := ctx.Value(contextKeySizes{}).(*MessageSizes); ok { - fields = append(fields, - "grpc.request.size", sizes.RequestSize, - "grpc.response.size", sizes.ResponseSize, - ) - } - - return fields - }), - logging.WithLevels(levelFromCode), - ) - - return loggingInterceptor(ctx, req, info, wrappedHandler) - } + return logging.StreamServerInterceptor( + InterceptorLogger(config.logger), + logging.WithLogOnEvents(logging.FinishCall), + logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields { + return extractLabkitFields(ctx, &config) + }), + logging.WithLevels(levelFromCode), + ) } func InterceptorLogger(l *logrus.Logger) logging.Logger { @@ -305,20 +127,3 @@ func levelFromCode(c codes.Code) logging.Level { return logging.LevelError } } - -func getLogLevel(c codes.Code) logrus.Level { - switch c { - case codes.OK: - return logrus.InfoLevel - case codes.Canceled, codes.InvalidArgument, codes.DeadlineExceeded, - codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, - codes.ResourceExhausted, codes.FailedPrecondition, codes.Aborted, - codes.OutOfRange, codes.Unimplemented: - return logrus.WarnLevel - case codes.Unknown, codes.Internal, codes.Unavailable, - codes.DataLoss, codes.Unauthenticated: - return logrus.ErrorLevel - default: - return logrus.ErrorLevel - } -} -- GitLab From c112fd5acc4ae5aa72f7ebf748dbffe76a22433e Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Wed, 17 Sep 2025 13:32:32 +0200 Subject: [PATCH 07/12] message sizes --- log/grpc_access_logger.go | 113 +++++++++++++++++++++++++++----------- 1 file changed, 82 insertions(+), 31 deletions(-) diff --git a/log/grpc_access_logger.go b/log/grpc_access_logger.go index c285dce5..82318d9c 100644 --- a/log/grpc_access_logger.go +++ b/log/grpc_access_logger.go @@ -3,7 +3,6 @@ package log import ( "context" - "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/labkit/correlation" @@ -15,43 +14,74 @@ import ( func UnaryServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.UnaryServerInterceptor { config := applyGrpcAccessLoggerOptions(opts) - return logging.UnaryServerInterceptor( - InterceptorLogger(config.logger), - logging.WithLogOnEvents(logging.FinishCall), - logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields { - return extractLabkitFields(ctx, &config) - }), - logging.WithLevels(levelFromCode), - // Add custom payload size logging - logging.WithFieldsFromContextAndCallMeta(func(ctx context.Context, callMeta interceptors.CallMeta) logging.Fields { - var fields []any - - // Add request size if available - if callMeta.ReqOrNil != nil { - if msg, ok := callMeta.ReqOrNil.(proto.Message); ok { - fields = append(fields, "grpc.request.size", proto.Size(msg)) - } - } + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + resp, err := handler(ctx, req) + + fields := extractLabkitFields(ctx, &config) + logProtoMessageSize(&fields, "grpc.request.size", req) + if err == nil { + logProtoMessageSize(&fields, "grpc.response.size", resp) + } - return fields - }), - ) + logger := InterceptorLogger(config.logger, fields) + + logging.UnaryServerInterceptor( + logger, + logging.WithLogOnEvents(logging.FinishCall), + logging.WithLevels(levelFromCode), + )(ctx, req, info, func(ctx context.Context, req any) (any, error) { + return resp, err + }) + + return resp, err + } +} + +type wrappedStream struct { + grpc.ServerStream + fields logging.Fields +} + +func (w *wrappedStream) RecvMsg(m any) error { + err := w.ServerStream.RecvMsg(m) + + if err == nil { + logProtoMessageSize(&w.fields, "grpc.request.size", m) + } + + return err +} + +func (w *wrappedStream) SendMsg(m any) error { + logProtoMessageSize(&w.fields, "grpc.response.size", m) + + return w.ServerStream.SendMsg(m) } func StreamServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.StreamServerInterceptor { config := applyGrpcAccessLoggerOptions(opts) - return logging.StreamServerInterceptor( - InterceptorLogger(config.logger), - logging.WithLogOnEvents(logging.FinishCall), - logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields { - return extractLabkitFields(ctx, &config) - }), - logging.WithLevels(levelFromCode), - ) + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + fields := extractLabkitFields(ss.Context(), &config) + wrapped := &wrappedStream{ss, fields} + + err := handler(srv, wrapped) + + logger := InterceptorLogger(config.logger, wrapped.fields) + + logging.StreamServerInterceptor( + logger, + logging.WithLogOnEvents(logging.FinishCall), + logging.WithLevels(levelFromCode), + )(srv, ss, info, func(srv any, stream grpc.ServerStream) error { + return err + }) + + return err + } } -func InterceptorLogger(l *logrus.Logger) logging.Logger { +func InterceptorLogger(l *logrus.Logger, extraFields logging.Fields) logging.Logger { return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) { logrusFields := make(logrus.Fields) @@ -59,11 +89,26 @@ func InterceptorLogger(l *logrus.Logger) logging.Logger { for i := 0; i < len(fields); i += 2 { if i+1 < len(fields) { if key, ok := fields[i].(string); ok { + if key == "peer.address" { + key = "grpc.peer.address" + } + if key == "grpc.time_ms" { + key = "duration_ms" + } logrusFields[key] = fields[i+1] } } } + // Add extra fields + for i := 0; i < len(extraFields); i += 2 { + if i+1 < len(extraFields) { + if key, ok := extraFields[i].(string); ok { + logrusFields[key] = extraFields[i+1] + } + } + } + // Convert logging.Level to logrus.Level var logrusLevel logrus.Level switch lvl { @@ -79,7 +124,7 @@ func InterceptorLogger(l *logrus.Logger) logging.Logger { logrusLevel = logrus.InfoLevel } - l.WithFields(logrusFields).Log(logrusLevel, msg) + l.WithFields(logrusFields).Log(logrusLevel, "access") }) } @@ -127,3 +172,9 @@ func levelFromCode(c codes.Code) logging.Level { return logging.LevelError } } + +func logProtoMessageSize(fields *logging.Fields, key string, msg any) { + if p, ok := msg.(proto.Message); ok { + *fields = append(*fields, key, proto.Size(p)) + } +} -- GitLab From fa258fc3fde3d50669d1796944dee61d21a05726 Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Wed, 17 Sep 2025 14:05:00 +0200 Subject: [PATCH 08/12] feat: implement grpc access logger test --- log/grpc_access_logger_test.go | 276 ++++++++------------------------- 1 file changed, 67 insertions(+), 209 deletions(-) diff --git a/log/grpc_access_logger_test.go b/log/grpc_access_logger_test.go index fcedbdf1..85585526 100644 --- a/log/grpc_access_logger_test.go +++ b/log/grpc_access_logger_test.go @@ -3,157 +3,69 @@ package log import ( "bytes" "context" - "io" - "net" - "strings" "testing" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/status" ) -const ( - grpcTestReqSize = 10 - grpcTestRespSize = 20 -) - -func TestUnaryServerInterceptor(t *testing.T) { +func TestGrpcAccessLogger(t *testing.T) { tests := []struct { name string + unary bool + correlation string + err error logMatchers []string - opts []GrpcAccessLoggerOption - handler func(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) }{ { - name: "trivial", + name: "unary success", + unary: true, logMatchers: []string{ - `"grpc.service":"grpc.testing.TestService"`, - `"grpc.method":"UnaryCall"`, - `"grpc.code":"OK"`, - `"grpc.peer.address":".*"`, - `"duration_ms":\d+`, - `"system":"grpc"`, - `"correlation_id":".*"`, - `"grpc.request.size":\d+`, - `"grpc.response.size":\d+`, + `\btime=\"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}`, + `\blevel=info`, + `\bmsg=access`, + `\bduration_ms=\d+`, + `\bgrpc.service=TestService`, + `\bgrpc.method=TestMethod`, + `\bgrpc.code=OK`, + `\bsystem=grpc`, }, }, { - name: "error", - handler: func(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { - return nil, status.Error(codes.NotFound, "not found") - }, + name: "unary error", + unary: true, + err: status.Error(codes.Internal, "internal error"), logMatchers: []string{ - `"grpc.code":"NotFound"`, + `\blevel=error`, + `\bgrpc.code=Internal`, }, }, { - name: "extra_fields", - opts: []GrpcAccessLoggerOption{ - WithGrpcExtraFields(func(ctx context.Context, method string) logrus.Fields { - return logrus.Fields{"testfield": "testvalue"} - }), - }, - logMatchers: []string{ - `"testfield":"testvalue"`, - }, - }, - { - name: "excluded_fields", - opts: []GrpcAccessLoggerOption{ - WithGrpcFieldsExcluded(GrpcService | GrpcMethod | GrpcRequestSize | GrpcResponseSize), - }, + name: "unary with correlation", + unary: true, + correlation: "test-correlation-id", logMatchers: []string{ - `"grpc.code":"OK"`, - `"grpc.peer.address":".*"`, - `"duration_ms":\d+`, - `"system":"grpc"`, - `"correlation_id":".*"`, + `\bcorrelation_id=test-correlation-id`, }, }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - buf := &bytes.Buffer{} - logger := newLogger(buf) - opts := tt.opts - opts = append(opts, WithGrpcAccessLogger(logger)) - - handler := tt.handler - if handler == nil { - handler = func(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { - return &grpc_testing.SimpleResponse{Payload: &grpc_testing.Payload{Body: make([]byte, grpcTestRespSize)}}, nil - } - } - - server := newTestGrpcServer(UnaryServerInterceptor(opts...)) - server.service.UnaryCallF = handler - - conn := newTestGrpcClient(t, server.listener.Addr().String()) - client := grpc_testing.NewTestServiceClient(conn) - - ctx := correlation.ContextWithCorrelation(context.Background(), "test-correlation-id") - _, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{ - Payload: &grpc_testing.Payload{Body: make([]byte, grpcTestReqSize)}, - }) - - if strings.Contains(tt.name, "error") { - require.Error(t, err) - s := status.Convert(err) - require.Equal(t, codes.NotFound, s.Code()) - } else { - require.NoError(t, err) - } - - logString := buf.String() - require.Contains(t, logString, `"msg":"access"`) - for _, v := range tt.logMatchers { - require.Regexp(t, v, logString) - } - - if strings.Contains(tt.name, "excluded_fields") { - require.NotContains(t, logString, `"grpc.service"`) - require.NotContains(t, logString, `"grpc.method"`) - require.NotContains(t, logString, `"grpc.request.size"`) - require.NotContains(t, logString, `"grpc.response.size"`) - } - }) - } -} - -func TestStreamServerInterceptor(t *testing.T) { - tests := []struct { - name string - logMatchers []string - opts []GrpcAccessLoggerOption - handler func(stream grpc_testing.TestService_FullDuplexCallServer) error - }{ { - name: "trivial", + name: "stream success", + unary: false, logMatchers: []string{ - `"grpc.service":"grpc.testing.TestService"`, - `"grpc.method":"FullDuplexCall"`, - `"grpc.code":"OK"`, - `"grpc.peer.address":".*"`, - `"duration_ms":\d+`, - `"system":"grpc"`, - `"correlation_id":".*"`, + `\blevel=info`, + `\bgrpc.code=OK`, }, }, { - name: "error", - handler: func(stream grpc_testing.TestService_FullDuplexCallServer) error { - return status.Error(codes.NotFound, "not found") - }, + name: "stream error", + unary: false, + err: status.Error(codes.Internal, "internal error"), logMatchers: []string{ - `"grpc.code":"NotFound"`, + `\blevel=error`, + `\bgrpc.code=Internal`, }, }, } @@ -161,39 +73,43 @@ func TestStreamServerInterceptor(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { buf := &bytes.Buffer{} - logger := newLogger(buf) - opts := tt.opts - opts = append(opts, WithGrpcAccessLogger(logger)) + logger := New() + _, err := Initialize(WithLogger(logger), WithWriter(buf)) + require.NoError(t, err) - handler := tt.handler - if handler == nil { - handler = func(stream grpc_testing.TestService_FullDuplexCallServer) error { - return nil - } + opts := []GrpcAccessLoggerOption{WithGrpcAccessLogger(logger)} + ctx := context.Background() + if tt.correlation != "" { + ctx = correlation.ContextWithCorrelation(ctx, tt.correlation) } - server := newTestGrpcServer(nil, StreamServerInterceptor(opts...)) - server.service.FullDuplexCallF = handler - - conn := newTestGrpcClient(t, server.listener.Addr().String()) - client := grpc_testing.NewTestServiceClient(conn) - - ctx := correlation.ContextWithCorrelation(context.Background(), "test-correlation-id") - stream, err := client.FullDuplexCall(ctx) - require.NoError(t, err) + if tt.unary { + info := &grpc.UnaryServerInfo{FullMethod: "/TestService/TestMethod"} + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return "hello", tt.err + } - // We need to receive to ensure the stream handler has completed - _, err = stream.Recv() - if strings.Contains(tt.name, "error") { - require.Error(t, err) - s, _ := status.FromError(err) - require.Equal(t, codes.NotFound, s.Code()) + _, err := UnaryServerInterceptor(opts...)(ctx, "world", info, handler) + if tt.err != nil { + require.EqualError(t, err, tt.err.Error()) + } else { + require.NoError(t, err) + } } else { - require.ErrorIs(t, err, io.EOF) + info := &grpc.StreamServerInfo{FullMethod: "/TestService/TestMethod"} + handler := func(srv interface{}, stream grpc.ServerStream) error { + return tt.err + } + + err := StreamServerInterceptor(opts...)(nil, &mockServerStream{ctx: ctx}, info, handler) + if tt.err != nil { + require.EqualError(t, err, tt.err.Error()) + } else { + require.NoError(t, err) + } } logString := buf.String() - require.Contains(t, logString, `"msg":"access"`) for _, v := range tt.logMatchers { require.Regexp(t, v, logString) } @@ -201,69 +117,11 @@ func TestStreamServerInterceptor(t *testing.T) { } } -func newLogger(w io.Writer) *logrus.Logger { - logger := logrus.New() - logger.Out = w - logger.Formatter = &logrus.JSONFormatter{} - return logger +type mockServerStream struct { + grpc.ServerStream + ctx context.Context } -type testService struct { - grpc_testing.UnimplementedTestServiceServer - UnaryCallF func(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) - FullDuplexCallF func(stream grpc_testing.TestService_FullDuplexCallServer) error -} - -func (s *testService) UnaryCall(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { - if s.UnaryCallF != nil { - return s.UnaryCallF(ctx, req) - } - return &grpc_testing.SimpleResponse{}, nil -} - -func (s *testService) FullDuplexCall(stream grpc_testing.TestService_FullDuplexCallServer) error { - if s.FullDuplexCallF != nil { - return s.FullDuplexCallF(stream) - } - return nil -} - -type testGrpcServer struct { - listener net.Listener - server *grpc.Server - service *testService -} - -func newTestGrpcServer(unaryInterceptor grpc.UnaryServerInterceptor, streamInterceptor ...grpc.StreamServerInterceptor) *testGrpcServer { - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - panic(err) - } - - service := &testService{} - opts := []grpc.ServerOption{} - if unaryInterceptor != nil { - opts = append(opts, grpc.UnaryInterceptor(unaryInterceptor)) - } - if len(streamInterceptor) > 0 { - opts = append(opts, grpc.StreamInterceptor(streamInterceptor[0])) - } - - server := grpc.NewServer(opts...) - grpc_testing.RegisterTestServiceServer(server, service) - - go server.Serve(listener) - - return &testGrpcServer{ - listener: listener, - server: server, - service: service, - } -} - -func newTestGrpcClient(t *testing.T, addr string) *grpc.ClientConn { - t.Helper() - conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err) - return conn -} +func (m *mockServerStream) Context() context.Context { + return m.ctx +} \ No newline at end of file -- GitLab From fc09dac6c591b5e510721a453e72bec5eda2180a Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Wed, 17 Sep 2025 14:12:34 +0200 Subject: [PATCH 09/12] new line --- log/grpc_access_logger_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/log/grpc_access_logger_test.go b/log/grpc_access_logger_test.go index 85585526..17445503 100644 --- a/log/grpc_access_logger_test.go +++ b/log/grpc_access_logger_test.go @@ -124,4 +124,5 @@ type mockServerStream struct { func (m *mockServerStream) Context() context.Context { return m.ctx -} \ No newline at end of file +} + -- GitLab From 385d7191abfd7fe27a2c1a8d0c024a088ab2a158 Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Wed, 17 Sep 2025 14:13:24 +0200 Subject: [PATCH 10/12] go mod tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 121a0e9f..dda720d3 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( golang.org/x/crypto v0.41.0 google.golang.org/api v0.54.0 google.golang.org/grpc v1.67.1 + google.golang.org/protobuf v1.36.6 gopkg.in/DataDog/dd-trace-go.v1 v1.32.0 ) @@ -74,6 +75,5 @@ require ( golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc // indirect - google.golang.org/protobuf v1.36.6 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) -- GitLab From 4b47a75fd79a62cb1bae782a0d07352dfa11d406 Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Wed, 17 Sep 2025 14:22:22 +0200 Subject: [PATCH 11/12] fix linting --- log/grpc_access_logger.go | 39 ++++++++++++++++++------------- log/grpc_access_logger_fields.go | 10 ++++---- log/grpc_access_logger_options.go | 15 ++++++------ log/grpc_access_logger_test.go | 21 ++++++----------- 4 files changed, 43 insertions(+), 42 deletions(-) diff --git a/log/grpc_access_logger.go b/log/grpc_access_logger.go index 82318d9c..9d0b380f 100644 --- a/log/grpc_access_logger.go +++ b/log/grpc_access_logger.go @@ -25,7 +25,7 @@ func UnaryServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.UnaryServerInte logger := InterceptorLogger(config.logger, fields) - logging.UnaryServerInterceptor( + _, _ = logging.UnaryServerInterceptor( logger, logging.WithLogOnEvents(logging.FinishCall), logging.WithLevels(levelFromCode), @@ -69,7 +69,7 @@ func StreamServerInterceptor(opts ...GrpcAccessLoggerOption) grpc.StreamServerIn logger := InterceptorLogger(config.logger, wrapped.fields) - logging.StreamServerInterceptor( + _ = logging.StreamServerInterceptor( logger, logging.WithLogOnEvents(logging.FinishCall), logging.WithLevels(levelFromCode), @@ -87,26 +87,33 @@ func InterceptorLogger(l *logrus.Logger, extraFields logging.Fields) logging.Log // Convert fields to logrus format for i := 0; i < len(fields); i += 2 { - if i+1 < len(fields) { - if key, ok := fields[i].(string); ok { - if key == "peer.address" { - key = "grpc.peer.address" - } - if key == "grpc.time_ms" { - key = "duration_ms" - } - logrusFields[key] = fields[i+1] - } + if i+1 >= len(fields) { + break } + key, ok := fields[i].(string) + if !ok { + continue + } + + if key == "peer.address" { + key = "grpc.peer.address" + } + if key == "grpc.time_ms" { + key = "duration_ms" + } + logrusFields[key] = fields[i+1] } // Add extra fields for i := 0; i < len(extraFields); i += 2 { - if i+1 < len(extraFields) { - if key, ok := extraFields[i].(string); ok { - logrusFields[key] = extraFields[i+1] - } + if i+1 >= len(extraFields) { + break + } + key, ok := extraFields[i].(string) + if !ok { + continue } + logrusFields[key] = extraFields[i+1] } // Convert logging.Level to logrus.Level diff --git a/log/grpc_access_logger_fields.go b/log/grpc_access_logger_fields.go index be17e299..9bc36cdd 100644 --- a/log/grpc_access_logger_fields.go +++ b/log/grpc_access_logger_fields.go @@ -1,11 +1,11 @@ package log -// GrpcAccessLogField is used to select which fields are recorded in the access log +// GrpcAccessLogField is used to select which fields are recorded in the access log. type GrpcAccessLogField uint16 -// GrpcAccessLogField defines bit flags for configuring which fields to include in gRPC access logs -// Combine multiple fields using bitwise OR: logConfig := GrpcCorrelationID | GrpcService | GrpcStatus -// Check if field is enabled using AND: if logConfig & GrpcService != 0 { ... } +// GrpcAccessLogField defines bit flags for configuring which fields to include in gRPC access logs. +// Combine multiple fields using bitwise OR: logConfig := GrpcCorrelationID | GrpcService | GrpcStatus. +// Check if field is enabled using AND: if logConfig & GrpcService != 0 { ... }. const ( GrpcCorrelationID GrpcAccessLogField = 1 << iota GrpcSystem @@ -18,5 +18,5 @@ const ( GrpcResponseSize ) -// Default to only labkit-specific fields, let middleware handle standard gRPC fields +// Default to only labkit-specific fields, let middleware handle standard gRPC fields. const defaultGrpcEnabledFields = GrpcCorrelationID | GrpcSystem | GrpcRequestSize | GrpcResponseSize diff --git a/log/grpc_access_logger_options.go b/log/grpc_access_logger_options.go index 0fd44c14..624b6f07 100644 --- a/log/grpc_access_logger_options.go +++ b/log/grpc_access_logger_options.go @@ -2,14 +2,15 @@ package log import ( "context" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/sirupsen/logrus" ) -// GrpcExtraFieldsGeneratorFunc allows extra fields to be included in the access log +// GrpcExtraFieldsGeneratorFunc allows extra fields to be included in the access log. type GrpcExtraFieldsGeneratorFunc func(ctx context.Context, fullMethodName string) Fields -// grpcAccessLoggerConfig holds the configuration for gRPC access logging +// grpcAccessLoggerConfig holds the configuration for gRPC access logging. type grpcAccessLoggerConfig struct { logger *logrus.Logger extraFields GrpcExtraFieldsGeneratorFunc @@ -17,7 +18,7 @@ type grpcAccessLoggerConfig struct { logEvents []logging.LoggableEvent // New: control when to log } -// GrpcAccessLoggerOption configures the gRPC access logger +// GrpcAccessLoggerOption configures the gRPC access logger. type GrpcAccessLoggerOption func(*grpcAccessLoggerConfig) func applyGrpcAccessLoggerOptions(opts []GrpcAccessLoggerOption) grpcAccessLoggerConfig { @@ -35,28 +36,28 @@ func applyGrpcAccessLoggerOptions(opts []GrpcAccessLoggerOption) grpcAccessLogge return config } -// WithGrpcExtraFields allows extra fields to be added to the access log +// WithGrpcExtraFields allows extra fields to be added to the access log. func WithGrpcExtraFields(f GrpcExtraFieldsGeneratorFunc) GrpcAccessLoggerOption { return func(config *grpcAccessLoggerConfig) { config.extraFields = f } } -// WithGrpcFieldsExcluded excludes specific fields from the access log +// WithGrpcFieldsExcluded excludes specific fields from the access log. func WithGrpcFieldsExcluded(fields GrpcAccessLogField) GrpcAccessLoggerOption { return func(config *grpcAccessLoggerConfig) { config.fields &^= fields } } -// WithGrpcAccessLogger sets a custom logger for the access logger +// WithGrpcAccessLogger sets a custom logger for the access logger. func WithGrpcAccessLogger(logger *logrus.Logger) GrpcAccessLoggerOption { return func(config *grpcAccessLoggerConfig) { config.logger = logger } } -// WithGrpcLogEvents controls when to log (start, finish, or both) +// WithGrpcLogEvents controls when to log (start, finish, or both). func WithGrpcLogEvents(events ...logging.LoggableEvent) GrpcAccessLoggerOption { return func(config *grpcAccessLoggerConfig) { config.logEvents = events diff --git a/log/grpc_access_logger_test.go b/log/grpc_access_logger_test.go index 17445503..2a3b88fb 100644 --- a/log/grpc_access_logger_test.go +++ b/log/grpc_access_logger_test.go @@ -88,25 +88,19 @@ func TestGrpcAccessLogger(t *testing.T) { handler := func(ctx context.Context, req interface{}) (interface{}, error) { return "hello", tt.err } - - _, err := UnaryServerInterceptor(opts...)(ctx, "world", info, handler) - if tt.err != nil { - require.EqualError(t, err, tt.err.Error()) - } else { - require.NoError(t, err) - } + _, err = UnaryServerInterceptor(opts...)(ctx, "world", info, handler) } else { info := &grpc.StreamServerInfo{FullMethod: "/TestService/TestMethod"} handler := func(srv interface{}, stream grpc.ServerStream) error { return tt.err } + err = StreamServerInterceptor(opts...)(nil, &mockServerStream{ctx: ctx}, info, handler) + } - err := StreamServerInterceptor(opts...)(nil, &mockServerStream{ctx: ctx}, info, handler) - if tt.err != nil { - require.EqualError(t, err, tt.err.Error()) - } else { - require.NoError(t, err) - } + if tt.err != nil { + require.EqualError(t, err, tt.err.Error()) + } else { + require.NoError(t, err) } logString := buf.String() @@ -125,4 +119,3 @@ type mockServerStream struct { func (m *mockServerStream) Context() context.Context { return m.ctx } - -- GitLab From db1ff0c6cc7fd3b09a948c4805f09479cc28b920 Mon Sep 17 00:00:00 2001 From: Vladimir Glafirov Date: Wed, 17 Sep 2025 16:11:28 +0200 Subject: [PATCH 12/12] applied Duo suggestion --- log/grpc_access_logger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/log/grpc_access_logger.go b/log/grpc_access_logger.go index 9d0b380f..b22f9d08 100644 --- a/log/grpc_access_logger.go +++ b/log/grpc_access_logger.go @@ -153,7 +153,7 @@ func extractLabkitFields(ctx context.Context, config *grpcAccessLoggerConfig) lo // Add extra fields from user configuration if config.extraFields != nil { - if extraFields := config.extraFields(ctx, ""); extraFields != nil { + if extraFields := config.extraFields(ctx, fullMethodName); extraFields != nil { for k, v := range extraFields { fields = append(fields, k, v) } -- GitLab