Files
go-cart-actor/rpc-pool.go
matst80 411b91252b
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m47s
implement queue
2024-11-09 20:54:23 +01:00

128 lines
2.5 KiB
Go

package main
import (
"fmt"
"io"
"net"
"strings"
"time"
)
type RemoteGrainPool struct {
Host string
grains map[CartId]RemoteGrain
}
func (id CartId) String() string {
return strings.Trim(string(id[:]), "\x00")
}
func ToCartId(id string) CartId {
var result [16]byte
copy(result[:], []byte(id))
return result
}
type RemoteGrain struct {
connection net.Conn
queue *PacketQueue
Id CartId
Address string
}
func NewRemoteGrain(id CartId, address string) *RemoteGrain {
return &RemoteGrain{
Id: id,
Address: address,
}
}
func (g *RemoteGrain) Connect() error {
if g.connection == nil {
client, err := net.Dial("tcp", g.Address)
if err != nil {
return err
}
g.connection = client
g.queue = NewPacketQueue(client)
}
return nil
}
func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
err := SendCartPacket(g.connection, g.Id, RemoteHandleMessage, message.Write)
if err != nil {
return nil, err
}
packet, err := g.queue.Expect(ResponseBody, time.Second)
if err != nil {
return nil, err
}
return packet.Data, err
}
func (g *RemoteGrain) GetId() CartId {
return g.Id
}
func (g *RemoteGrain) GetCurrentState() ([]byte, error) {
err := SendCartPacket(g.connection, g.Id, RemoteGetState, func(w io.Writer) error {
return nil
})
if err != nil {
return nil, err
}
packet, err := g.queue.Expect(ResponseBody, time.Second)
if err != nil {
return nil, err
}
return packet.Data, nil
}
func NewRemoteGrainPool(addr string) *RemoteGrainPool {
return &RemoteGrainPool{
Host: addr,
grains: make(map[CartId]RemoteGrain),
}
}
func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain {
grain, ok := p.grains[id]
if !ok {
return nil
}
grain.Connect()
return &grain
}
func (p *RemoteGrainPool) findOrCreateGrain(id CartId) *RemoteGrain {
grain := p.findRemoteGrain(id)
if grain == nil {
grain = NewRemoteGrain(id, p.Host)
p.grains[id] = *grain
grain.Connect()
}
return grain
}
func (p *RemoteGrainPool) Process(id CartId, messages ...Message) ([]byte, error) {
var result []byte
var err error
grain := p.findOrCreateGrain(id)
if grain == nil {
return nil, fmt.Errorf("grain not found")
}
for _, message := range messages {
result, err = grain.HandleMessage(&message, false)
}
return result, err
}
func (p *RemoteGrainPool) Get(id CartId) ([]byte, error) {
grain := p.findOrCreateGrain(id)
if grain == nil {
return nil, fmt.Errorf("grain not found")
}
return grain.GetCurrentState()
}