diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index 966506f..12a0492 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "io/fs" + "log" "net/http" "os" "path/filepath" @@ -17,6 +18,7 @@ import ( "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/cart" + "github.com/gogo/protobuf/proto" ) type FileServer struct { @@ -232,6 +234,22 @@ type JsonError struct { Error string `json:"error"` } +func acceptAll(_ proto.Message, _ int, _ time.Time) bool { + return true +} + +func acceptUntilIndex(maxIndex int) func(msg proto.Message, index int, when time.Time) bool { + return func(msg proto.Message, index int, when time.Time) bool { + return index < maxIndex + } +} + +func acceptUntilTimestamp(until time.Time) func(msg proto.Message, index int, when time.Time) bool { + return func(msg proto.Message, index int, when time.Time) bool { + return when.Before(until) + } +} + func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { idStr := r.PathValue("id") if idStr == "" { @@ -243,10 +261,29 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid id"}) return } + // parse query parameters for filtering + query := r.URL.Query() + filterFunction := acceptAll + if maxIndexStr := query.Get("maxIndex"); maxIndexStr != "" { + log.Printf("filter maxIndex: %s", maxIndexStr) + maxIndex, err := strconv.Atoi(maxIndexStr) + if err != nil { + writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid maxIndex"}) + return + } + filterFunction = acceptUntilIndex(maxIndex) + } else if untilStr := query.Get("until"); untilStr != "" { + log.Printf("filter until: %s", untilStr) + until, err := time.Parse(time.RFC3339, untilStr) + if err != nil { + writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid until timestamp"}) + return + } + filterFunction = acceptUntilTimestamp(until) + } // reconstruct state from event log if present grain := cart.NewCartGrain(id, time.Now()) - - err := fs.storage.LoadEvents(r.Context(), id, grain) + err := fs.storage.LoadEventsFunc(r.Context(), id, grain, filterFunction) if err != nil { writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()}) return diff --git a/pkg/actor/disk_storage.go b/pkg/actor/disk_storage.go index e65a601..50021ed 100644 --- a/pkg/actor/disk_storage.go +++ b/pkg/actor/disk_storage.go @@ -27,6 +27,7 @@ type DiskStorage[V any] struct { type LogStorage[V any] interface { LoadEvents(ctx context.Context, id uint64, grain Grain[V]) error + LoadEventsFunc(ctx context.Context, id uint64, grain Grain[V], condition func(msg proto.Message, index int, timeStamp time.Time) bool) error AppendMutations(id uint64, msg ...proto.Message) error } @@ -87,6 +88,27 @@ func (s *DiskStorage[V]) logPath(id uint64) string { return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id)) } +func (s *DiskStorage[V]) LoadEventsFunc(ctx context.Context, id uint64, grain Grain[V], condition func(msg proto.Message, index int, timeStamp time.Time) bool) error { + path := s.logPath(id) + if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { + // No log -> nothing to replay + return nil + } + + fh, err := os.Open(path) + if err != nil { + return fmt.Errorf("open replay file: %w", err) + } + defer fh.Close() + index := 0 + return s.Load(fh, func(msg proto.Message, when time.Time) { + if condition(msg, index, when) { + s.registry.Apply(ctx, grain, msg) + } + index++ + }) +} + func (s *DiskStorage[V]) LoadEvents(ctx context.Context, id uint64, grain Grain[V]) error { path := s.logPath(id) if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { @@ -99,7 +121,7 @@ func (s *DiskStorage[V]) LoadEvents(ctx context.Context, id uint64, grain Grain[ return fmt.Errorf("open replay file: %w", err) } defer fh.Close() - return s.Load(fh, func(msg proto.Message) { + return s.Load(fh, func(msg proto.Message, _ time.Time) { s.registry.Apply(ctx, grain, msg) }) } diff --git a/pkg/actor/state.go b/pkg/actor/state.go index 14020d9..b1e70ac 100644 --- a/pkg/actor/state.go +++ b/pkg/actor/state.go @@ -34,14 +34,14 @@ func NewState(registry MutationRegistry) *StateStorage { var ErrUnknownType = errors.New("unknown type") -func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message)) error { +func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message, timeStamp time.Time)) error { var err error var evt *StorageEvent scanner := bufio.NewScanner(r) for err == nil { evt, err = s.Read(scanner) if err == nil { - onMessage(evt.Mutation) + onMessage(evt.Mutation, evt.TimeStamp) } } if err == io.EOF {