update the code
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@@ -17,6 +18,7 @@ import (
|
|||||||
|
|
||||||
"git.tornberg.me/go-cart-actor/pkg/actor"
|
"git.tornberg.me/go-cart-actor/pkg/actor"
|
||||||
"git.tornberg.me/go-cart-actor/pkg/cart"
|
"git.tornberg.me/go-cart-actor/pkg/cart"
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileServer struct {
|
type FileServer struct {
|
||||||
@@ -232,6 +234,22 @@ type JsonError struct {
|
|||||||
Error string `json:"error"`
|
Error string `json:"error"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func acceptAll(_ proto.Message, _ int, _ time.Time) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func acceptUntilIndex(maxIndex int) func(msg proto.Message, index int, when time.Time) bool {
|
||||||
|
return func(msg proto.Message, index int, when time.Time) bool {
|
||||||
|
return index < maxIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func acceptUntilTimestamp(until time.Time) func(msg proto.Message, index int, when time.Time) bool {
|
||||||
|
return func(msg proto.Message, index int, when time.Time) bool {
|
||||||
|
return when.Before(until)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
idStr := r.PathValue("id")
|
idStr := r.PathValue("id")
|
||||||
if idStr == "" {
|
if idStr == "" {
|
||||||
@@ -243,10 +261,29 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid id"})
|
writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid id"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// parse query parameters for filtering
|
||||||
|
query := r.URL.Query()
|
||||||
|
filterFunction := acceptAll
|
||||||
|
if maxIndexStr := query.Get("maxIndex"); maxIndexStr != "" {
|
||||||
|
log.Printf("filter maxIndex: %s", maxIndexStr)
|
||||||
|
maxIndex, err := strconv.Atoi(maxIndexStr)
|
||||||
|
if err != nil {
|
||||||
|
writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid maxIndex"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
filterFunction = acceptUntilIndex(maxIndex)
|
||||||
|
} else if untilStr := query.Get("until"); untilStr != "" {
|
||||||
|
log.Printf("filter until: %s", untilStr)
|
||||||
|
until, err := time.Parse(time.RFC3339, untilStr)
|
||||||
|
if err != nil {
|
||||||
|
writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid until timestamp"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
filterFunction = acceptUntilTimestamp(until)
|
||||||
|
}
|
||||||
// reconstruct state from event log if present
|
// reconstruct state from event log if present
|
||||||
grain := cart.NewCartGrain(id, time.Now())
|
grain := cart.NewCartGrain(id, time.Now())
|
||||||
|
err := fs.storage.LoadEventsFunc(r.Context(), id, grain, filterFunction)
|
||||||
err := fs.storage.LoadEvents(r.Context(), id, grain)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()})
|
writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()})
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ type DiskStorage[V any] struct {
|
|||||||
|
|
||||||
type LogStorage[V any] interface {
|
type LogStorage[V any] interface {
|
||||||
LoadEvents(ctx context.Context, id uint64, grain Grain[V]) error
|
LoadEvents(ctx context.Context, id uint64, grain Grain[V]) error
|
||||||
|
LoadEventsFunc(ctx context.Context, id uint64, grain Grain[V], condition func(msg proto.Message, index int, timeStamp time.Time) bool) error
|
||||||
AppendMutations(id uint64, msg ...proto.Message) error
|
AppendMutations(id uint64, msg ...proto.Message) error
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,6 +88,27 @@ func (s *DiskStorage[V]) logPath(id uint64) string {
|
|||||||
return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id))
|
return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *DiskStorage[V]) LoadEventsFunc(ctx context.Context, id uint64, grain Grain[V], condition func(msg proto.Message, index int, timeStamp time.Time) bool) error {
|
||||||
|
path := s.logPath(id)
|
||||||
|
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
||||||
|
// No log -> nothing to replay
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fh, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open replay file: %w", err)
|
||||||
|
}
|
||||||
|
defer fh.Close()
|
||||||
|
index := 0
|
||||||
|
return s.Load(fh, func(msg proto.Message, when time.Time) {
|
||||||
|
if condition(msg, index, when) {
|
||||||
|
s.registry.Apply(ctx, grain, msg)
|
||||||
|
}
|
||||||
|
index++
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (s *DiskStorage[V]) LoadEvents(ctx context.Context, id uint64, grain Grain[V]) error {
|
func (s *DiskStorage[V]) LoadEvents(ctx context.Context, id uint64, grain Grain[V]) error {
|
||||||
path := s.logPath(id)
|
path := s.logPath(id)
|
||||||
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
||||||
@@ -99,7 +121,7 @@ func (s *DiskStorage[V]) LoadEvents(ctx context.Context, id uint64, grain Grain[
|
|||||||
return fmt.Errorf("open replay file: %w", err)
|
return fmt.Errorf("open replay file: %w", err)
|
||||||
}
|
}
|
||||||
defer fh.Close()
|
defer fh.Close()
|
||||||
return s.Load(fh, func(msg proto.Message) {
|
return s.Load(fh, func(msg proto.Message, _ time.Time) {
|
||||||
s.registry.Apply(ctx, grain, msg)
|
s.registry.Apply(ctx, grain, msg)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,14 +34,14 @@ func NewState(registry MutationRegistry) *StateStorage {
|
|||||||
|
|
||||||
var ErrUnknownType = errors.New("unknown type")
|
var ErrUnknownType = errors.New("unknown type")
|
||||||
|
|
||||||
func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message)) error {
|
func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message, timeStamp time.Time)) error {
|
||||||
var err error
|
var err error
|
||||||
var evt *StorageEvent
|
var evt *StorageEvent
|
||||||
scanner := bufio.NewScanner(r)
|
scanner := bufio.NewScanner(r)
|
||||||
for err == nil {
|
for err == nil {
|
||||||
evt, err = s.Read(scanner)
|
evt, err = s.Read(scanner)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
onMessage(evt.Mutation)
|
onMessage(evt.Mutation, evt.TimeStamp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
|
|||||||
Reference in New Issue
Block a user