queued disk stuff
Some checks are pending
Build and Publish / Metadata (push) Successful in 10s
Build and Publish / BuildAndDeployAmd64 (push) Has started running
Build and Publish / BuildAndDeployArm64 (push) Has started running

This commit is contained in:
matst80
2025-10-13 19:05:12 +02:00
parent 6fbd62936f
commit f3e92c7d65
6 changed files with 87 additions and 38 deletions

View File

@@ -144,8 +144,6 @@ func main() {
ret.lastChange = time.Now() ret.lastChange = time.Now()
ret.lastAccess = time.Now() ret.lastAccess = time.Now()
// Legacy loadMessages (no-op) retained; then replay append-only event log
//_ = loadMessages(ret, id)
err := diskStorage.LoadEvents(id, ret) err := diskStorage.LoadEvents(id, ret)
return ret, err return ret, err
@@ -172,6 +170,8 @@ func main() {
} }
defer grpcSrv.GracefulStop() defer grpcSrv.GracefulStop()
go diskStorage.SaveLoop(10 * time.Second)
go func(hw discovery.Discovery) { go func(hw discovery.Discovery) {
if hw == nil { if hw == nil {
log.Print("No discovery service available") log.Print("No discovery service available")

View File

@@ -1 +0,0 @@
{"type":"AddItem","timestamp":"2025-10-13T15:25:09.772277+02:00","mutation":{"item_id":789396,"quantity":1,"price":18600,"sku":"789396","name":"Samsung Galaxy Z Fold6 Slim S-Pen fodral (grått)","image":"/image/dv_web_D18000128131832/789396/samsung-galaxy-z-fold6-slim-s-pen-fodral-gratt.jpg","tax":2500,"brand":"Samsung","category":"Mobiler, Tablets \u0026 Smartklockor","category2":"Mobiltillbehör","category3":"Mobilskal \u0026 Mobilfodral","articleType":"ZHAW","sellerId":"152","sellerName":"Elgiganten"}}

View File

@@ -6,13 +6,22 @@ import (
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
) )
type QueueEvent struct {
TimeStamp time.Time
Message proto.Message
}
type DiskStorage[V any] struct { type DiskStorage[V any] struct {
*StateStorage *StateStorage
path string path string
done chan struct{}
queue *sync.Map // map[uint64][]QueueEvent
} }
type LogStorage[V any] interface { type LogStorage[V any] interface {
@@ -20,13 +29,51 @@ type LogStorage[V any] interface {
AppendEvent(id uint64, msg proto.Message) error AppendEvent(id uint64, msg proto.Message) error
} }
func NewDiskStorage[V any](path string, registry MutationRegistry) LogStorage[V] { func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] {
return &DiskStorage[V]{ return &DiskStorage[V]{
StateStorage: NewState(registry), StateStorage: NewState(registry),
path: path, path: path,
done: make(chan struct{}),
} }
} }
func (s *DiskStorage[V]) SaveLoop(duration time.Duration) {
s.queue = &sync.Map{}
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-s.done:
s.save()
return
case <-ticker.C:
s.save()
}
}
}
func (s *DiskStorage[V]) save() {
s.queue.Range(func(key, value any) bool {
id := key.(uint64)
path := s.logPath(id)
fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Printf("failed to open event log file: %v", err)
return true
}
defer fh.Close()
if qe, ok := value.([]QueueEvent); ok {
for _, msg := range qe {
if err := s.Append(fh, msg.Message, msg.TimeStamp); err != nil {
log.Printf("failed to append event to log file: %v", err)
}
}
}
return true
})
}
func (s *DiskStorage[V]) logPath(id uint64) string { func (s *DiskStorage[V]) logPath(id uint64) string {
return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id)) return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id))
} }
@@ -48,7 +95,24 @@ func (s *DiskStorage[V]) LoadEvents(id uint64, grain Grain[V]) error {
}) })
} }
func (s *DiskStorage[V]) Close() {
s.save()
close(s.done)
}
func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error { func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error {
if s.queue != nil {
queue := make([]QueueEvent, 0)
data, found := s.queue.Load(id)
if found {
queue = data.([]QueueEvent)
}
queue = append(queue, QueueEvent{Message: msg})
s.queue.Store(id, queue)
return nil
} else {
path := s.logPath(id) path := s.logPath(id)
fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
@@ -57,6 +121,7 @@ func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error {
} }
defer fh.Close() defer fh.Close()
return s.Append(fh, msg) return s.Append(fh, msg, time.Now())
}
} }

View File

@@ -3,6 +3,7 @@ package actor
import ( import (
"errors" "errors"
"reflect" "reflect"
"slices"
"testing" "testing"
"git.tornberg.me/go-cart-actor/pkg/messages" "git.tornberg.me/go-cart-actor/pkg/messages"
@@ -16,7 +17,7 @@ type cartState struct {
func TestRegisteredMutationBasics(t *testing.T) { func TestRegisteredMutationBasics(t *testing.T) {
reg := NewMutationRegistry().(*ProtoMutationRegistry) reg := NewMutationRegistry().(*ProtoMutationRegistry)
addItemMutation := NewMutation[cartState, *messages.AddItem]( addItemMutation := NewMutation(
func(state *cartState, msg *messages.AddItem) error { func(state *cartState, msg *messages.AddItem) error {
state.calls++ state.calls++
// copy to avoid external mutation side-effects (not strictly necessary for the test) // copy to avoid external mutation side-effects (not strictly necessary for the test)
@@ -39,13 +40,13 @@ func TestRegisteredMutationBasics(t *testing.T) {
// RegisteredMutations: membership (order not guaranteed) // RegisteredMutations: membership (order not guaranteed)
names := reg.RegisteredMutations() names := reg.RegisteredMutations()
if !stringSliceContains(names, "AddItem") { if !slices.Contains(names, "AddItem") {
t.Fatalf("RegisteredMutations missing AddItem, got %v", names) t.Fatalf("RegisteredMutations missing AddItem, got %v", names)
} }
// RegisteredMutationTypes: membership (order not guaranteed) // RegisteredMutationTypes: membership (order not guaranteed)
types := reg.RegisteredMutationTypes() types := reg.RegisteredMutationTypes()
if !typeSliceContains(types, reflect.TypeOf(messages.AddItem{})) { if !slices.Contains(types, reflect.TypeOf(messages.AddItem{})) {
t.Fatalf("RegisteredMutationTypes missing AddItem type, got %v", types) t.Fatalf("RegisteredMutationTypes missing AddItem type, got %v", types)
} }
@@ -131,21 +132,3 @@ func TestRegisteredMutationBasics(t *testing.T) {
// } // }
// Helpers // Helpers
func stringSliceContains(list []string, target string) bool {
for _, s := range list {
if s == target {
return true
}
}
return false
}
func typeSliceContains(list []reflect.Type, target reflect.Type) bool {
for _, t := range list {
if t == target {
return true
}
}
return false
}

View File

@@ -48,14 +48,14 @@ func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message)) erro
return err return err
} }
func (s *StateStorage) Append(io io.Writer, mutation proto.Message) error { func (s *StateStorage) Append(io io.Writer, mutation proto.Message, timeStamp time.Time) error {
typeName, ok := s.registry.GetTypeName(mutation) typeName, ok := s.registry.GetTypeName(mutation)
if !ok { if !ok {
return ErrUnknownType return ErrUnknownType
} }
event := &StorageEvent{ event := &StorageEvent{
Type: typeName, Type: typeName,
TimeStamp: time.Now(), TimeStamp: timeStamp,
Mutation: mutation, Mutation: mutation,
} }
jsonBytes, err := json.Marshal(event) jsonBytes, err := json.Marshal(event)

View File

@@ -2,6 +2,7 @@ package discovery
import ( import (
"context" "context"
"log"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -46,7 +47,7 @@ func (k *K8sDiscovery) Watch() (<-chan HostChange, error) {
TimeoutSeconds: &timeout, TimeoutSeconds: &timeout,
}) })
} }
watcher, err := toolsWatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watcherFn}) watcher, err := toolsWatch.NewRetryWatcherWithContext(k.ctx, "1", &cache.ListWatch{WatchFunc: watcherFn})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -55,6 +56,7 @@ func (k *K8sDiscovery) Watch() (<-chan HostChange, error) {
for event := range watcher.ResultChan() { for event := range watcher.ResultChan() {
pod := event.Object.(*v1.Pod) pod := event.Object.(*v1.Pod)
log.Printf("pod change %v", pod.Status)
ch <- HostChange{ ch <- HostChange{
Host: pod.Status.PodIP, Host: pod.Status.PodIP,
Type: event.Type, Type: event.Type,