Files
go-cart-actor/pkg/actor/state.go
Mats Törnberg f5014fe906
All checks were successful
Build and Publish / Metadata (push) Successful in 11s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 1m14s
Build and Publish / BuildAndDeployArm64 (push) Successful in 3m54s
Complete refactor to new grpc control plane and only http proxy for carts (#4)
Co-authored-by: matst80 <mats.tornberg@gmail.com>
Reviewed-on: https://git.tornberg.me/mats/go-cart-actor/pulls/4
Co-authored-by: Mats Törnberg <mats@tornberg.me>
Co-committed-by: Mats Törnberg <mats@tornberg.me>
2025-10-14 22:31:12 +02:00

99 lines
1.9 KiB
Go

package actor
import (
"bufio"
"encoding/json"
"errors"
"io"
"time"
"github.com/gogo/protobuf/proto"
)
type StateStorage struct {
registry MutationRegistry
}
type StorageEvent struct {
Type string `json:"type"`
TimeStamp time.Time `json:"timestamp"`
Mutation proto.Message `json:"mutation"`
}
type rawEvent struct {
Type string `json:"type"`
TimeStamp time.Time `json:"timestamp"`
Mutation json.RawMessage `json:"mutation"`
}
func NewState(registry MutationRegistry) *StateStorage {
return &StateStorage{
registry: registry,
}
}
var ErrUnknownType = errors.New("unknown type")
func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message)) error {
var err error
var evt *StorageEvent
scanner := bufio.NewScanner(r)
for err == nil {
evt, err = s.Read(scanner)
if err == nil {
onMessage(evt.Mutation)
}
}
if err == io.EOF {
return nil
}
return err
}
func (s *StateStorage) Append(io io.Writer, mutation proto.Message, timeStamp time.Time) error {
typeName, ok := s.registry.GetTypeName(mutation)
if !ok {
return ErrUnknownType
}
event := &StorageEvent{
Type: typeName,
TimeStamp: timeStamp,
Mutation: mutation,
}
jsonBytes, err := json.Marshal(event)
if err != nil {
return err
}
if _, err := io.Write(jsonBytes); err != nil {
return err
}
io.Write([]byte("\n"))
return nil
}
func (s *StateStorage) Read(r *bufio.Scanner) (*StorageEvent, error) {
var event rawEvent
if r.Scan() {
b := r.Bytes()
err := json.Unmarshal(b, &event)
if err != nil {
return nil, err
}
typeName := event.Type
mutation, ok := s.registry.Create(typeName)
if !ok {
return nil, ErrUnknownType
}
if err := json.Unmarshal(event.Mutation, mutation); err != nil {
return nil, err
}
return &StorageEvent{
Type: typeName,
TimeStamp: event.TimeStamp,
Mutation: mutation,
}, r.Err()
}
return nil, io.EOF
}