From 43fcf69139ef19a41c047f38d230ad5c4f22c18a Mon Sep 17 00:00:00 2001 From: matst80 Date: Fri, 7 Nov 2025 14:20:54 +0100 Subject: [PATCH] more traces --- pkg/actor/grpc_server.go | 114 +++++++++++++++++++++++++++++++++++++-- pkg/proxy/remotehost.go | 28 +++++++++- 2 files changed, 137 insertions(+), 5 deletions(-) diff --git a/pkg/actor/grpc_server.go b/pkg/actor/grpc_server.go index 8db40e8..2a262ac 100644 --- a/pkg/actor/grpc_server.go +++ b/pkg/actor/grpc_server.go @@ -8,6 +8,10 @@ import ( "time" messages "git.tornberg.me/go-cart-actor/pkg/messages" + "go.opentelemetry.io/contrib/bridges/otelslog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) @@ -19,9 +23,73 @@ type ControlServer[V any] struct { pool GrainPool[V] } +const name = "grpc_server" + +var ( + tracer = otel.Tracer(name) + meter = otel.Meter(name) + logger = otelslog.NewLogger(name) + pingCalls metric.Int64Counter + negotiateCalls metric.Int64Counter + getLocalActorIdsCalls metric.Int64Counter + announceOwnershipCalls metric.Int64Counter + announceExpiryCalls metric.Int64Counter + closingCalls metric.Int64Counter +) + +func init() { + var err error + pingCalls, err = meter.Int64Counter("grpc.ping_calls", + metric.WithDescription("Number of ping calls"), + metric.WithUnit("{calls}")) + if err != nil { + panic(err) + } + negotiateCalls, err = meter.Int64Counter("grpc.negotiate_calls", + metric.WithDescription("Number of negotiate calls"), + metric.WithUnit("{calls}")) + if err != nil { + panic(err) + } + getLocalActorIdsCalls, err = meter.Int64Counter("grpc.get_local_actor_ids_calls", + metric.WithDescription("Number of get local actor ids calls"), + metric.WithUnit("{calls}")) + if err != nil { + panic(err) + } + announceOwnershipCalls, err = meter.Int64Counter("grpc.announce_ownership_calls", + metric.WithDescription("Number of announce ownership calls"), + metric.WithUnit("{calls}")) + if err != nil { + panic(err) + } + announceExpiryCalls, err = meter.Int64Counter("grpc.announce_expiry_calls", + metric.WithDescription("Number of announce expiry calls"), + metric.WithUnit("{calls}")) + if err != nil { + panic(err) + } + closingCalls, err = meter.Int64Counter("grpc.closing_calls", + metric.WithDescription("Number of closing calls"), + metric.WithUnit("{calls}")) + if err != nil { + panic(err) + } +} + func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages.OwnershipAnnounce) (*messages.OwnerChangeAck, error) { + ctx, span := tracer.Start(ctx, "grpc_announce_ownership") + defer span.End() + span.SetAttributes( + attribute.String("host", req.Host), + attribute.Int("id_count", len(req.Ids)), + ) + logger.InfoContext(ctx, "announce ownership", "host", req.Host, "id_count", len(req.Ids)) + announceOwnershipCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", req.Host))) + err := s.pool.HandleOwnershipChange(req.Host, req.Ids) if err != nil { + span.RecordError(err) return &messages.OwnerChangeAck{ Accepted: false, Message: "owner change failed", @@ -36,7 +104,19 @@ func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages. } func (s *ControlServer[V]) AnnounceExpiry(ctx context.Context, req *messages.ExpiryAnnounce) (*messages.OwnerChangeAck, error) { + ctx, span := tracer.Start(ctx, "grpc_announce_expiry") + defer span.End() + span.SetAttributes( + attribute.String("host", req.Host), + attribute.Int("id_count", len(req.Ids)), + ) + logger.InfoContext(ctx, "announce expiry", "host", req.Host, "id_count", len(req.Ids)) + announceExpiryCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", req.Host))) + err := s.pool.HandleRemoteExpiry(req.Host, req.Ids) + if err != nil { + span.RecordError(err) + } return &messages.OwnerChangeAck{ Accepted: err == nil, Message: "expiry acknowledged", @@ -45,15 +125,27 @@ func (s *ControlServer[V]) AnnounceExpiry(ctx context.Context, req *messages.Exp // ControlPlane: Ping func (s *ControlServer[V]) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) { + ctx, span := tracer.Start(ctx, "grpc_ping") + defer span.End() + host := s.pool.Hostname() + span.SetAttributes(attribute.String("host", host)) + logger.InfoContext(ctx, "ping received", "host", host) + pingCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", host))) + // log.Printf("got ping") return &messages.PingReply{ - Host: s.pool.Hostname(), + Host: host, UnixTime: time.Now().Unix(), }, nil } // ControlPlane: Negotiate (merge host views) func (s *ControlServer[V]) Negotiate(ctx context.Context, req *messages.NegotiateRequest) (*messages.NegotiateReply, error) { + ctx, span := tracer.Start(ctx, "grpc_negotiate") + defer span.End() + span.SetAttributes(attribute.Int("known_hosts_count", len(req.KnownHosts))) + logger.InfoContext(ctx, "negotiate", "known_hosts_count", len(req.KnownHosts)) + negotiateCalls.Add(ctx, 1) s.pool.Negotiate(req.KnownHosts) return &messages.NegotiateReply{Hosts: req.GetKnownHosts()}, nil @@ -61,13 +153,27 @@ func (s *ControlServer[V]) Negotiate(ctx context.Context, req *messages.Negotiat // ControlPlane: GetCartIds (locally owned carts only) func (s *ControlServer[V]) GetLocalActorIds(ctx context.Context, _ *messages.Empty) (*messages.ActorIdsReply, error) { - return &messages.ActorIdsReply{Ids: s.pool.GetLocalIds()}, nil + ctx, span := tracer.Start(ctx, "grpc_get_local_actor_ids") + defer span.End() + ids := s.pool.GetLocalIds() + span.SetAttributes(attribute.Int("id_count", len(ids))) + logger.InfoContext(ctx, "get local actor ids", "id_count", len(ids)) + getLocalActorIdsCalls.Add(ctx, 1) + + return &messages.ActorIdsReply{Ids: ids}, nil } // ControlPlane: Closing (peer shutdown notification) func (s *ControlServer[V]) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) { - if req.GetHost() != "" { - s.pool.RemoveHost(req.GetHost()) + ctx, span := tracer.Start(ctx, "grpc_closing") + defer span.End() + host := req.GetHost() + span.SetAttributes(attribute.String("host", host)) + logger.InfoContext(ctx, "closing notice", "host", host) + closingCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", host))) + + if host != "" { + s.pool.RemoveHost(host) } return &messages.OwnerChangeAck{ Accepted: true, diff --git a/pkg/proxy/remotehost.go b/pkg/proxy/remotehost.go index aa4eded..7445bb8 100644 --- a/pkg/proxy/remotehost.go +++ b/pkg/proxy/remotehost.go @@ -10,6 +10,9 @@ import ( "time" messages "git.tornberg.me/go-cart-actor/pkg/messages" + "go.opentelemetry.io/contrib/bridges/otelslog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -26,6 +29,14 @@ type RemoteHost struct { missedPings int } +const name = "proxy" + +var ( + tracer = otel.Tracer(name) + meter = otel.Meter(name) + logger = otelslog.NewLogger(name) +) + func NewRemoteHost(host string) (*RemoteHost, error) { target := fmt.Sprintf("%s:1337", host) @@ -147,8 +158,20 @@ func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (b target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI()) log.Printf("proxy target: %s, method: %s", target, r.Method) - req, err := http.NewRequestWithContext(r.Context(), r.Method, target, r.Body) + + ctx, span := tracer.Start(r.Context(), "remote_proxy") + defer span.End() + span.SetAttributes( + attribute.String("cartid", fmt.Sprintf("%d", id)), + attribute.String("host", h.host), + attribute.String("method", r.Method), + attribute.String("target", target), + ) + logger.InfoContext(ctx, "proxying request", "cartid", id, "host", h.host, "method", r.Method) + + req, err := http.NewRequestWithContext(ctx, r.Method, target, r.Body) if err != nil { + span.RecordError(err) http.Error(w, "proxy build error", http.StatusBadGateway) return false, err } @@ -162,10 +185,12 @@ func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (b } res, err := h.client.Do(req) if err != nil { + span.RecordError(err) http.Error(w, "proxy request error", http.StatusBadGateway) return false, err } defer res.Body.Close() + span.SetAttributes(attribute.Int("status_code", res.StatusCode)) for k, v := range res.Header { for _, vv := range v { w.Header().Add(k, vv) @@ -176,6 +201,7 @@ func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (b w.WriteHeader(res.StatusCode) _, copyErr := io.Copy(w, res.Body) if copyErr != nil { + span.RecordError(copyErr) return true, copyErr } return true, nil