Files
go-cart-actor/rpc-pool.go
matst80 c57b8a2c53
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m51s
measure remote grain latency
2024-11-11 13:35:03 +01:00

151 lines
3.1 KiB
Go

package main
import (
"fmt"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type RemoteGrainPool struct {
mu sync.RWMutex
Host string
grains map[CartId]*RemoteGrain
}
func (id CartId) String() string {
return strings.Trim(string(id[:]), "\x00")
}
func ToCartId(id string) CartId {
var result [16]byte
copy(result[:], []byte(id))
return result
}
type RemoteGrain struct {
*CartClient
Id CartId
Host string
}
func NewRemoteGrain(id CartId, host string) (*RemoteGrain, error) {
client, err := CartDial(fmt.Sprintf("%s:1337", host))
if err != nil {
return nil, err
}
return &RemoteGrain{
Id: id,
Host: host,
CartClient: client,
}, nil
}
var (
remoteCartLatency = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_grain_calls_total_latency",
Help: "The total latency of remote grains",
})
remoteCartCallsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_grain_calls_total",
Help: "The total number of calls to remote grains",
})
)
func MeasureLatency(fn func() ([]byte, error)) ([]byte, error) {
start := time.Now()
data, err := fn()
if err != nil {
return data, err
}
elapsed := time.Since(start).Milliseconds()
remoteCartLatency.Add(float64(elapsed))
remoteCartCallsTotal.Inc()
return data, nil
}
func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
data, err := GetData(message.Write)
if err != nil {
return nil, err
}
reply, err := MeasureLatency(func() ([]byte, error) { return g.Call(RemoteHandleMutation, g.Id, RemoteHandleMutationReply, data) })
if err != nil {
return nil, err
}
return reply, err
}
func (g *RemoteGrain) GetId() CartId {
return g.Id
}
func (g *RemoteGrain) GetCurrentState() ([]byte, error) {
return MeasureLatency(func() ([]byte, error) { return g.Call(RemoteGetState, g.Id, RemoteGetStateReply, []byte{}) })
}
func NewRemoteGrainPool(addr string) *RemoteGrainPool {
return &RemoteGrainPool{
Host: addr,
grains: make(map[CartId]*RemoteGrain),
}
}
func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain {
p.mu.RLock()
grain, ok := p.grains[id]
p.mu.RUnlock()
if !ok {
return nil
}
return grain
}
func (p *RemoteGrainPool) findOrCreateGrain(id CartId) (*RemoteGrain, error) {
grain := p.findRemoteGrain(id)
if grain == nil {
grain, err := NewRemoteGrain(id, p.Host)
if err != nil {
return nil, err
}
p.mu.Lock()
p.grains[id] = grain
p.mu.Unlock()
}
return grain, nil
}
func (p *RemoteGrainPool) Delete(id CartId) {
p.mu.Lock()
delete(p.grains, id)
p.mu.Unlock()
}
func (p *RemoteGrainPool) Process(id CartId, messages ...Message) ([]byte, error) {
var result []byte
grain, err := p.findOrCreateGrain(id)
if err != nil {
return nil, err
}
for _, message := range messages {
result, err = grain.HandleMessage(&message, false)
}
return result, err
}
func (p *RemoteGrainPool) Get(id CartId) ([]byte, error) {
grain, err := p.findOrCreateGrain(id)
if err != nil {
return nil, err
}
return grain.GetCurrentState()
}