diff --git a/rpc-pool.go b/rpc-pool.go index bac577e..79ef36a 100644 --- a/rpc-pool.go +++ b/rpc-pool.go @@ -4,6 +4,10 @@ import ( "fmt" "strings" "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) type RemoteGrainPool struct { @@ -41,13 +45,36 @@ func NewRemoteGrain(id CartId, host string) (*RemoteGrain, error) { }, nil } +var ( + remoteCartLatency = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_remote_grain_calls_total_latency", + Help: "The total latency of remote grains", + }) + remoteCartCallsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_remote_grain_calls_total", + Help: "The total number of calls to remote grains", + }) +) + +func MeasureLatency(fn func() ([]byte, error)) ([]byte, error) { + start := time.Now() + data, err := fn() + if err != nil { + return data, err + } + elapsed := time.Since(start).Milliseconds() + remoteCartLatency.Add(float64(elapsed)) + remoteCartCallsTotal.Inc() + return data, nil +} + func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) { data, err := GetData(message.Write) if err != nil { return nil, err } - reply, err := g.Call(RemoteHandleMutation, g.Id, RemoteHandleMutationReply, data) + reply, err := MeasureLatency(func() ([]byte, error) { return g.Call(RemoteHandleMutation, g.Id, RemoteHandleMutationReply, data) }) if err != nil { return nil, err @@ -61,7 +88,7 @@ func (g *RemoteGrain) GetId() CartId { } func (g *RemoteGrain) GetCurrentState() ([]byte, error) { - return g.Call(RemoteGetState, g.Id, RemoteGetStateReply, []byte{}) + return MeasureLatency(func() ([]byte, error) { return g.Call(RemoteGetState, g.Id, RemoteGetStateReply, []byte{}) }) } func NewRemoteGrainPool(addr string) *RemoteGrainPool { diff --git a/rpc-server.go b/rpc-server.go index bf52f4c..2de1be3 100644 --- a/rpc-server.go +++ b/rpc-server.go @@ -41,7 +41,9 @@ func (h *GrainHandler) RemoteHandleMessageHandler(id CartId, data []byte) (uint3 fmt.Println("Error reading message:", err) return RemoteHandleMutationReply, nil, err } + replyData, err := h.pool.Process(id, msg) + if err != nil { fmt.Println("Error handling message:", err) }