diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 426dfd3..398a564 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -403,7 +403,7 @@ func main() { go func() { sig := <-sigs fmt.Println("Shutting down due to signal:", sig) - //app.Save() + diskStorage.Close() pool.Close() done <- true diff --git a/pkg/actor/disk_storage.go b/pkg/actor/disk_storage.go index 6fbaeed..dc4c4dd 100644 --- a/pkg/actor/disk_storage.go +++ b/pkg/actor/disk_storage.go @@ -70,8 +70,10 @@ func (s *DiskStorage[V]) save() { } } } + s.queue.Delete(id) return true }) + log.Print("Appended carts to disk") } func (s *DiskStorage[V]) logPath(id uint64) string { @@ -102,7 +104,6 @@ func (s *DiskStorage[V]) Close() { func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error { if s.queue != nil { - queue := make([]QueueEvent, 0) data, found := s.queue.Load(id) if found { @@ -112,7 +113,6 @@ func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error { s.queue.Store(id, queue) return nil } else { - path := s.logPath(id) fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { diff --git a/pkg/actor/state.go b/pkg/actor/state.go index 257e7db..14020d9 100644 --- a/pkg/actor/state.go +++ b/pkg/actor/state.go @@ -1,6 +1,7 @@ package actor import ( + "bufio" "encoding/json" "errors" "io" @@ -36,8 +37,9 @@ var ErrUnknownType = errors.New("unknown type") func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message)) error { var err error var evt *StorageEvent + scanner := bufio.NewScanner(r) for err == nil { - evt, err = s.Read(r) + evt, err = s.Read(scanner) if err == nil { onMessage(evt.Mutation) } @@ -65,25 +67,32 @@ func (s *StateStorage) Append(io io.Writer, mutation proto.Message, timeStamp ti if _, err := io.Write(jsonBytes); err != nil { return err } + io.Write([]byte("\n")) return nil } -func (s *StateStorage) Read(r io.Reader) (*StorageEvent, error) { +func (s *StateStorage) Read(r *bufio.Scanner) (*StorageEvent, error) { var event rawEvent - if err := json.NewDecoder(r).Decode(&event); err != nil { - return nil, err + + if r.Scan() { + b := r.Bytes() + err := json.Unmarshal(b, &event) + if err != nil { + return nil, err + } + typeName := event.Type + mutation, ok := s.registry.Create(typeName) + if !ok { + return nil, ErrUnknownType + } + if err := json.Unmarshal(event.Mutation, mutation); err != nil { + return nil, err + } + return &StorageEvent{ + Type: typeName, + TimeStamp: event.TimeStamp, + Mutation: mutation, + }, r.Err() } - typeName := event.Type - mutation, ok := s.registry.Create(typeName) - if !ok { - return nil, ErrUnknownType - } - if err := json.Unmarshal(event.Mutation, mutation); err != nil { - return nil, err - } - return &StorageEvent{ - Type: typeName, - TimeStamp: event.TimeStamp, - Mutation: mutation, - }, nil + return nil, io.EOF }