package main import ( "fmt" "strings" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) type RemoteGrainPool struct { mu sync.RWMutex Host string grains map[CartId]*RemoteGrain } func (id CartId) String() string { return strings.Trim(string(id[:]), "\x00") } func ToCartId(id string) CartId { var result [16]byte copy(result[:], []byte(id)) return result } type RemoteGrain struct { *CartClient Id CartId Host string } func NewRemoteGrain(id CartId, host string) (*RemoteGrain, error) { client, err := CartDial(fmt.Sprintf("%s:1337", host)) if err != nil { return nil, err } return &RemoteGrain{ Id: id, Host: host, CartClient: client, }, nil } var ( remoteCartLatency = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_grain_calls_total_latency", Help: "The total latency of remote grains", }) remoteCartCallsTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_remote_grain_calls_total", Help: "The total number of calls to remote grains", }) ) var start time.Time func MeasureLatency(fn func() ([]byte, error)) ([]byte, error) { start = time.Now() data, err := fn() if err != nil { return data, err } elapsed := time.Since(start).Milliseconds() go func() { remoteCartLatency.Add(float64(elapsed)) remoteCartCallsTotal.Inc() }() return data, nil } func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) { data, err := GetData(message.Write) if err != nil { return nil, err } reply, err := MeasureLatency(func() ([]byte, error) { return g.Call(RemoteHandleMutation, g.Id, RemoteHandleMutationReply, data) }) if err != nil { return nil, err } return reply, err } func (g *RemoteGrain) GetId() CartId { return g.Id } func (g *RemoteGrain) GetCurrentState() ([]byte, error) { return MeasureLatency(func() ([]byte, error) { return g.Call(RemoteGetState, g.Id, RemoteGetStateReply, []byte{}) }) } func NewRemoteGrainPool(addr string) *RemoteGrainPool { return &RemoteGrainPool{ Host: addr, grains: make(map[CartId]*RemoteGrain), } } func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain { p.mu.RLock() grain, ok := p.grains[id] p.mu.RUnlock() if !ok { return nil } return grain } func (p *RemoteGrainPool) findOrCreateGrain(id CartId) (*RemoteGrain, error) { grain := p.findRemoteGrain(id) if grain == nil { grain, err := NewRemoteGrain(id, p.Host) if err != nil { return nil, err } p.mu.Lock() p.grains[id] = grain p.mu.Unlock() } return grain, nil } func (p *RemoteGrainPool) Delete(id CartId) { p.mu.Lock() delete(p.grains, id) p.mu.Unlock() } func (p *RemoteGrainPool) Process(id CartId, messages ...Message) ([]byte, error) { var result []byte grain, err := p.findOrCreateGrain(id) if err != nil { return nil, err } for _, message := range messages { result, err = grain.HandleMessage(&message, false) } return result, err } func (p *RemoteGrainPool) Get(id CartId) ([]byte, error) { grain, err := p.findOrCreateGrain(id) if err != nil { return nil, err } return grain.GetCurrentState() }