package main import ( "fmt" "io" "net" "strings" "sync" "time" ) type RemoteGrainPool struct { mu sync.RWMutex Host string grains map[CartId]RemoteGrain } func (id CartId) String() string { return strings.Trim(string(id[:]), "\x00") } func ToCartId(id string) CartId { var result [16]byte copy(result[:], []byte(id)) return result } type RemoteGrain struct { connection net.Conn queue *PacketQueue Id CartId Address string } func NewRemoteGrain(id CartId, address string) *RemoteGrain { return &RemoteGrain{ Id: id, Address: address, } } func (g *RemoteGrain) Connect() error { if g.connection == nil { client, err := net.Dial("tcp", g.Address) if err != nil { return err } g.connection = client g.queue = NewPacketQueue(client) } return nil } func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) { err := SendCartPacket(g.connection, g.Id, RemoteHandleMessage, message.Write) if err != nil { return nil, err } packet, err := g.queue.Expect(ResponseBody, time.Second) if err != nil { return nil, err } return packet.Data, err } func (g *RemoteGrain) GetId() CartId { return g.Id } func (g *RemoteGrain) GetCurrentState() ([]byte, error) { err := SendCartPacket(g.connection, g.Id, RemoteGetState, func(w io.Writer) error { return nil }) if err != nil { return nil, err } packet, err := g.queue.Expect(ResponseBody, time.Second) if err != nil { return nil, err } return packet.Data, nil } func NewRemoteGrainPool(addr string) *RemoteGrainPool { return &RemoteGrainPool{ Host: addr, grains: make(map[CartId]RemoteGrain), } } func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain { p.mu.RLock() grain, ok := p.grains[id] p.mu.RUnlock() if !ok { return nil } grain.Connect() return &grain } func (p *RemoteGrainPool) findOrCreateGrain(id CartId) *RemoteGrain { grain := p.findRemoteGrain(id) if grain == nil { grain = NewRemoteGrain(id, p.Host) p.mu.Lock() p.grains[id] = *grain p.mu.Unlock() grain.Connect() } return grain } func (p *RemoteGrainPool) Process(id CartId, messages ...Message) ([]byte, error) { var result []byte var err error grain := p.findOrCreateGrain(id) if grain == nil { return nil, fmt.Errorf("grain not found") } for _, message := range messages { result, err = grain.HandleMessage(&message, false) } return result, err } func (p *RemoteGrainPool) Get(id CartId) ([]byte, error) { grain := p.findOrCreateGrain(id) if grain == nil { return nil, fmt.Errorf("grain not found") } return grain.GetCurrentState() }