Files
go-cart-actor/rpc-pool.go
2024-11-08 23:02:09 +01:00

111 lines
2.0 KiB
Go

package main
import (
"io"
"net"
"strings"
)
type RemoteGrainPool struct {
Hosts []string
grains map[CartId]RemoteGrain
}
func (id CartId) String() string {
return strings.Trim(string(id[:]), "\x00")
}
func ToCartId(id string) CartId {
var result [16]byte
copy(result[:], []byte(id))
return result
}
type RemoteGrain struct {
client net.Conn
Id CartId
Address string
}
func NewRemoteGrain(id CartId, address string) *RemoteGrain {
return &RemoteGrain{
Id: id,
Address: address,
}
}
func (g *RemoteGrain) Connect() error {
if g.client == nil {
client, err := net.Dial("tcp", g.Address)
if err != nil {
return err
}
g.client = client
}
return nil
}
func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
err := SendCartPacket(g.client, g.Id, RemoteHandleMessage, message.Write)
if err != nil {
return nil, err
}
_, data, err := ReceivePacket(g.client)
return data, err
}
func (g *RemoteGrain) GetId() CartId {
return g.Id
}
func (g *RemoteGrain) GetCurrentState() ([]byte, error) {
err := SendCartPacket(g.client, g.Id, RemoteGetState, func(w io.Writer) error {
return nil
})
if err != nil {
return nil, err
}
_, data, err := ReceivePacket(g.client)
return data, err
}
func NewRemoteGrainPool(addr ...string) *RemoteGrainPool {
return &RemoteGrainPool{
Hosts: addr,
grains: make(map[CartId]RemoteGrain),
}
}
func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain {
grain, ok := p.grains[id]
if !ok {
return nil
}
return &grain
}
func (p *RemoteGrainPool) Process(id CartId, messages ...Message) ([]byte, error) {
var result []byte
var err error
grain := p.findRemoteGrain(id)
if grain == nil {
grain = NewRemoteGrain(id, p.Hosts[0])
grain.Connect()
p.grains[id] = *grain
}
for _, message := range messages {
result, err = grain.HandleMessage(&message, false)
}
return result, err
}
func (p *RemoteGrainPool) Get(id CartId) ([]byte, error) {
grain := p.findRemoteGrain(id)
if grain == nil {
return nil, nil
}
return grain.GetCurrentState()
}