Files
go-cart-actor/rpc-pool.go
matst80 b1f6d8b071
Some checks failed
Build and Publish / BuildAndDeploy (push) Has been cancelled
more mutex
2024-11-09 22:06:30 +01:00

134 lines
2.6 KiB
Go

package main
import (
"fmt"
"io"
"net"
"strings"
"sync"
"time"
)
type RemoteGrainPool struct {
mu sync.RWMutex
Host string
grains map[CartId]RemoteGrain
}
func (id CartId) String() string {
return strings.Trim(string(id[:]), "\x00")
}
func ToCartId(id string) CartId {
var result [16]byte
copy(result[:], []byte(id))
return result
}
type RemoteGrain struct {
connection net.Conn
queue *PacketQueue
Id CartId
Address string
}
func NewRemoteGrain(id CartId, address string) *RemoteGrain {
return &RemoteGrain{
Id: id,
Address: address,
}
}
func (g *RemoteGrain) Connect() error {
if g.connection == nil {
client, err := net.Dial("tcp", g.Address)
if err != nil {
return err
}
g.connection = client
g.queue = NewPacketQueue(client)
}
return nil
}
func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
err := SendCartPacket(g.connection, g.Id, RemoteHandleMessage, message.Write)
if err != nil {
return nil, err
}
packet, err := g.queue.Expect(ResponseBody, time.Second)
if err != nil {
return nil, err
}
return packet.Data, err
}
func (g *RemoteGrain) GetId() CartId {
return g.Id
}
func (g *RemoteGrain) GetCurrentState() ([]byte, error) {
err := SendCartPacket(g.connection, g.Id, RemoteGetState, func(w io.Writer) error {
return nil
})
if err != nil {
return nil, err
}
packet, err := g.queue.Expect(ResponseBody, time.Second)
if err != nil {
return nil, err
}
return packet.Data, nil
}
func NewRemoteGrainPool(addr string) *RemoteGrainPool {
return &RemoteGrainPool{
Host: addr,
grains: make(map[CartId]RemoteGrain),
}
}
func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain {
p.mu.RLock()
grain, ok := p.grains[id]
p.mu.RUnlock()
if !ok {
return nil
}
grain.Connect()
return &grain
}
func (p *RemoteGrainPool) findOrCreateGrain(id CartId) *RemoteGrain {
grain := p.findRemoteGrain(id)
if grain == nil {
grain = NewRemoteGrain(id, p.Host)
p.mu.Lock()
p.grains[id] = *grain
p.mu.Unlock()
grain.Connect()
}
return grain
}
func (p *RemoteGrainPool) Process(id CartId, messages ...Message) ([]byte, error) {
var result []byte
var err error
grain := p.findOrCreateGrain(id)
if grain == nil {
return nil, fmt.Errorf("grain not found")
}
for _, message := range messages {
result, err = grain.HandleMessage(&message, false)
}
return result, err
}
func (p *RemoteGrainPool) Get(id CartId) ([]byte, error) {
grain := p.findOrCreateGrain(id)
if grain == nil {
return nil, fmt.Errorf("grain not found")
}
return grain.GetCurrentState()
}