package actor import ( "bufio" "encoding/json" "errors" "io" "time" "google.golang.org/protobuf/proto" ) type StateStorage struct { registry MutationRegistry } type StorageEvent struct { Type string `json:"type"` TimeStamp time.Time `json:"timestamp"` Mutation proto.Message `json:"mutation"` } type rawEvent struct { Type string `json:"type"` TimeStamp time.Time `json:"timestamp"` Mutation json.RawMessage `json:"mutation"` } func NewState(registry MutationRegistry) *StateStorage { return &StateStorage{ registry: registry, } } var ErrUnknownType = errors.New("unknown type") func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message, timeStamp time.Time)) error { var err error var evt *StorageEvent scanner := bufio.NewScanner(r) for err == nil { evt, err = s.Read(scanner) if err == nil { onMessage(evt.Mutation, evt.TimeStamp) } } if err == io.EOF { return nil } return err } func (s *StateStorage) Append(io io.Writer, mutation proto.Message, timeStamp time.Time) error { typeName, ok := s.registry.GetTypeName(mutation) if !ok { return ErrUnknownType } event := &StorageEvent{ Type: typeName, TimeStamp: timeStamp, Mutation: mutation, } jsonBytes, err := json.Marshal(event) if err != nil { return err } if _, err := io.Write(jsonBytes); err != nil { return err } io.Write([]byte("\n")) return nil } func (s *StateStorage) Read(r *bufio.Scanner) (*StorageEvent, error) { var event rawEvent if r.Scan() { b := r.Bytes() err := json.Unmarshal(b, &event) if err != nil { return nil, err } typeName := event.Type mutation, ok := s.registry.Create(typeName) if !ok { return nil, ErrUnknownType } if err := json.Unmarshal(event.Mutation, mutation); err != nil { return nil, err } return &StorageEvent{ Type: typeName, TimeStamp: event.TimeStamp, Mutation: mutation, }, r.Err() } return nil, io.EOF }