Files
go-cart-actor/rpc-pool.go
matst80 15d966aaad
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m51s
connect to remote grains if not found
2024-11-10 16:58:52 +01:00

149 lines
2.8 KiB
Go

package main
import (
"fmt"
"io"
"net"
"strings"
"sync"
"time"
)
type RemoteGrainPool struct {
mu sync.RWMutex
Host string
grains map[CartId]*RemoteGrain
}
func (id CartId) String() string {
return strings.Trim(string(id[:]), "\x00")
}
func ToCartId(id string) CartId {
var result [16]byte
copy(result[:], []byte(id))
return result
}
type RemoteGrain struct {
net.Conn
*PacketQueue
Id CartId
Address string
}
func NewRemoteGrain(id CartId, address string) *RemoteGrain {
return &RemoteGrain{
Id: id,
Address: address,
}
}
func (g *RemoteGrain) Connect() error {
if g == nil {
return fmt.Errorf("grain is deleted")
}
if g.connection == nil {
addr := g.Address
if !strings.Contains(addr, ":") {
addr = fmt.Sprintf("%s:1337", addr)
}
client, err := net.Dial("tcp", addr)
if err != nil {
return err
}
g.Conn = client
g.PacketQueue = NewPacketQueue(client)
}
return nil
}
func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
err := g.Connect()
if err != nil {
return nil, err
}
err = SendCartPacket(g.connection, g.Id, RemoteHandleMessage, message.Write)
if err != nil {
return nil, err
}
packet, err := g.Expect(ResponseBody, time.Second)
if err != nil {
return nil, err
}
return packet.Data, err
}
func (g *RemoteGrain) GetId() CartId {
return g.Id
}
func (g *RemoteGrain) GetCurrentState() ([]byte, error) {
err := g.Connect()
if err != nil {
return nil, err
}
err = SendCartPacket(g.connection, g.Id, RemoteGetState, func(w io.Writer) error {
return nil
})
if err != nil {
return nil, err
}
packet, err := g.Expect(ResponseBody, time.Second)
if err != nil {
return nil, err
}
return packet.Data, nil
}
func NewRemoteGrainPool(addr string) *RemoteGrainPool {
return &RemoteGrainPool{
Host: addr,
grains: make(map[CartId]*RemoteGrain),
}
}
func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain {
p.mu.RLock()
grain, ok := p.grains[id]
p.mu.RUnlock()
if !ok {
return nil
}
grain.Connect()
return grain
}
func (p *RemoteGrainPool) findOrCreateGrain(id CartId) *RemoteGrain {
grain := p.findRemoteGrain(id)
if grain == nil {
grain = NewRemoteGrain(id, p.Host)
p.mu.Lock()
p.grains[id] = grain
p.mu.Unlock()
grain.Connect()
}
return grain
}
func (p *RemoteGrainPool) Process(id CartId, messages ...Message) ([]byte, error) {
var result []byte
var err error
grain := p.findOrCreateGrain(id)
if grain == nil {
return nil, fmt.Errorf("grain not found")
}
for _, message := range messages {
result, err = grain.HandleMessage(&message, false)
}
return result, err
}
func (p *RemoteGrainPool) Get(id CartId) ([]byte, error) {
grain := p.findOrCreateGrain(id)
if grain == nil {
return nil, fmt.Errorf("grain not found")
}
return grain.GetCurrentState()
}