diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 5a61020..426dfd3 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -144,8 +144,6 @@ func main() { ret.lastChange = time.Now() ret.lastAccess = time.Now() - // Legacy loadMessages (no-op) retained; then replay append-only event log - //_ = loadMessages(ret, id) err := diskStorage.LoadEvents(id, ret) return ret, err @@ -172,6 +170,8 @@ func main() { } defer grpcSrv.GracefulStop() + go diskStorage.SaveLoop(10 * time.Second) + go func(hw discovery.Discovery) { if hw == nil { log.Print("No discovery service available") diff --git a/data/14958337247011543113.events.log b/data/14958337247011543113.events.log deleted file mode 100644 index dd07e7b..0000000 --- a/data/14958337247011543113.events.log +++ /dev/null @@ -1 +0,0 @@ -{"type":"AddItem","timestamp":"2025-10-13T15:25:09.772277+02:00","mutation":{"item_id":789396,"quantity":1,"price":18600,"sku":"789396","name":"Samsung Galaxy Z Fold6 Slim S-Pen fodral (grÄtt)","image":"/image/dv_web_D18000128131832/789396/samsung-galaxy-z-fold6-slim-s-pen-fodral-gratt.jpg","tax":2500,"brand":"Samsung","category":"Mobiler, Tablets \u0026 Smartklockor","category2":"Mobiltillbehör","category3":"Mobilskal \u0026 Mobilfodral","articleType":"ZHAW","sellerId":"152","sellerName":"Elgiganten"}} diff --git a/pkg/actor/disk_storage.go b/pkg/actor/disk_storage.go index 7ee9207..6fbaeed 100644 --- a/pkg/actor/disk_storage.go +++ b/pkg/actor/disk_storage.go @@ -6,13 +6,22 @@ import ( "log" "os" "path/filepath" + "sync" + "time" "github.com/gogo/protobuf/proto" ) +type QueueEvent struct { + TimeStamp time.Time + Message proto.Message +} + type DiskStorage[V any] struct { *StateStorage - path string + path string + done chan struct{} + queue *sync.Map // map[uint64][]QueueEvent } type LogStorage[V any] interface { @@ -20,13 +29,51 @@ type LogStorage[V any] interface { AppendEvent(id uint64, msg proto.Message) error } -func NewDiskStorage[V any](path string, registry MutationRegistry) LogStorage[V] { +func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] { return &DiskStorage[V]{ StateStorage: NewState(registry), path: path, + done: make(chan struct{}), } } +func (s *DiskStorage[V]) SaveLoop(duration time.Duration) { + s.queue = &sync.Map{} + ticker := time.NewTicker(duration) + defer ticker.Stop() + for { + select { + case <-s.done: + s.save() + return + case <-ticker.C: + s.save() + } + } +} + +func (s *DiskStorage[V]) save() { + s.queue.Range(func(key, value any) bool { + id := key.(uint64) + path := s.logPath(id) + fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Printf("failed to open event log file: %v", err) + return true + } + defer fh.Close() + + if qe, ok := value.([]QueueEvent); ok { + for _, msg := range qe { + if err := s.Append(fh, msg.Message, msg.TimeStamp); err != nil { + log.Printf("failed to append event to log file: %v", err) + } + } + } + return true + }) +} + func (s *DiskStorage[V]) logPath(id uint64) string { return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id)) } @@ -48,15 +95,33 @@ func (s *DiskStorage[V]) LoadEvents(id uint64, grain Grain[V]) error { }) } -func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error { - path := s.logPath(id) - fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - log.Printf("failed to open event log file: %v", err) - return err - } - defer fh.Close() +func (s *DiskStorage[V]) Close() { + s.save() + close(s.done) +} - return s.Append(fh, msg) +func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error { + if s.queue != nil { + + queue := make([]QueueEvent, 0) + data, found := s.queue.Load(id) + if found { + queue = data.([]QueueEvent) + } + queue = append(queue, QueueEvent{Message: msg}) + s.queue.Store(id, queue) + return nil + } else { + + path := s.logPath(id) + fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Printf("failed to open event log file: %v", err) + return err + } + defer fh.Close() + + return s.Append(fh, msg, time.Now()) + } } diff --git a/pkg/actor/mutation_registry_test.go b/pkg/actor/mutation_registry_test.go index 99720b8..5891849 100644 --- a/pkg/actor/mutation_registry_test.go +++ b/pkg/actor/mutation_registry_test.go @@ -3,6 +3,7 @@ package actor import ( "errors" "reflect" + "slices" "testing" "git.tornberg.me/go-cart-actor/pkg/messages" @@ -16,7 +17,7 @@ type cartState struct { func TestRegisteredMutationBasics(t *testing.T) { reg := NewMutationRegistry().(*ProtoMutationRegistry) - addItemMutation := NewMutation[cartState, *messages.AddItem]( + addItemMutation := NewMutation( func(state *cartState, msg *messages.AddItem) error { state.calls++ // copy to avoid external mutation side-effects (not strictly necessary for the test) @@ -39,13 +40,13 @@ func TestRegisteredMutationBasics(t *testing.T) { // RegisteredMutations: membership (order not guaranteed) names := reg.RegisteredMutations() - if !stringSliceContains(names, "AddItem") { + if !slices.Contains(names, "AddItem") { t.Fatalf("RegisteredMutations missing AddItem, got %v", names) } // RegisteredMutationTypes: membership (order not guaranteed) types := reg.RegisteredMutationTypes() - if !typeSliceContains(types, reflect.TypeOf(messages.AddItem{})) { + if !slices.Contains(types, reflect.TypeOf(messages.AddItem{})) { t.Fatalf("RegisteredMutationTypes missing AddItem type, got %v", types) } @@ -131,21 +132,3 @@ func TestRegisteredMutationBasics(t *testing.T) { // } // Helpers - -func stringSliceContains(list []string, target string) bool { - for _, s := range list { - if s == target { - return true - } - } - return false -} - -func typeSliceContains(list []reflect.Type, target reflect.Type) bool { - for _, t := range list { - if t == target { - return true - } - } - return false -} diff --git a/pkg/actor/state.go b/pkg/actor/state.go index be7c4ff..257e7db 100644 --- a/pkg/actor/state.go +++ b/pkg/actor/state.go @@ -48,14 +48,14 @@ func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message)) erro return err } -func (s *StateStorage) Append(io io.Writer, mutation proto.Message) error { +func (s *StateStorage) Append(io io.Writer, mutation proto.Message, timeStamp time.Time) error { typeName, ok := s.registry.GetTypeName(mutation) if !ok { return ErrUnknownType } event := &StorageEvent{ Type: typeName, - TimeStamp: time.Now(), + TimeStamp: timeStamp, Mutation: mutation, } jsonBytes, err := json.Marshal(event) diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 21b7d36..a0d13b9 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "log" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,7 +47,7 @@ func (k *K8sDiscovery) Watch() (<-chan HostChange, error) { TimeoutSeconds: &timeout, }) } - watcher, err := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watcherFn}) + watcher, err := toolsWatch.NewRetryWatcherWithContext(k.ctx, "1", &cache.ListWatch{WatchFunc: watcherFn}) if err != nil { return nil, err } @@ -55,6 +56,7 @@ func (k *K8sDiscovery) Watch() (<-chan HostChange, error) { for event := range watcher.ResultChan() { pod := event.Object.(*v1.Pod) + log.Printf("pod change %v", pod.Status) ch <- HostChange{ Host: pod.Status.PodIP, Type: event.Type,