missing updates #5

Merged
mats merged 77 commits from refactor/http-proxy into main 2025-10-14 23:12:06 +02:00
5 changed files with 33 additions and 26 deletions
Showing only changes of commit 8e60cc2239 - Show all commits

View File

@@ -29,8 +29,8 @@ func NewPoolServer(pool actor.GrainPool[*CartGrain], pod_name string, klarnaClie
} }
} }
func (s *PoolServer) ApplyLocal(id CartId, mutation proto.Message) (*CartGrain, error) { func (s *PoolServer) ApplyLocal(id CartId, mutation ...proto.Message) (*CartGrain, error) {
return s.Apply(uint64(id), mutation) return s.Apply(uint64(id), mutation...)
} }
func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request, id CartId) error { func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request, id CartId) error {
@@ -178,28 +178,30 @@ func (s *PoolServer) HandleSetCartItems(w http.ResponseWriter, r *http.Request,
if err != nil { if err != nil {
return err return err
} }
reply, err := s.ApplyLocal(id, &messages.ClearCartRequest{})
if err != nil { msgs := make([]proto.Message, 0, len(setCartItems.Items)+1)
return err msgs = append(msgs, &messages.ClearCartRequest{})
}
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, item := range setCartItems.Items { for _, item := range setCartItems.Items {
wg.Add(1) wg.Add(1)
go func(sku string, quantity int) { go func(sku string, quantity int) {
defer wg.Done()
msg, err := GetItemAddMessage(sku, quantity, setCartItems.Country, nil) msg, err := GetItemAddMessage(sku, quantity, setCartItems.Country, nil)
if err != nil { if err != nil {
log.Printf("error adding item %s: %v", sku, err) log.Printf("error adding item %s: %v", sku, err)
return return
} }
reply, err = s.ApplyLocal(id, msg) msgs = append(msgs, msg)
if err != nil {
log.Printf("error applying message %v: %v", msg, err)
return
}
wg.Done()
}(item.Sku, item.Quantity) }(item.Sku, item.Quantity)
} }
wg.Wait() wg.Wait()
reply, err := s.ApplyLocal(id, msgs...)
if err != nil {
return err
}
return s.WriteResult(w, reply) return s.WriteResult(w, reply)
} }

View File

@@ -26,7 +26,7 @@ type DiskStorage[V any] struct {
type LogStorage[V any] interface { type LogStorage[V any] interface {
LoadEvents(id uint64, grain Grain[V]) error LoadEvents(id uint64, grain Grain[V]) error
AppendEvent(id uint64, msg proto.Message) error AppendEvent(id uint64, msg ...proto.Message) error
} }
func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] { func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] {
@@ -102,14 +102,16 @@ func (s *DiskStorage[V]) Close() {
close(s.done) close(s.done)
} }
func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error { func (s *DiskStorage[V]) AppendEvent(id uint64, msg ...proto.Message) error {
if s.queue != nil { if s.queue != nil {
queue := make([]QueueEvent, 0) queue := make([]QueueEvent, 0)
data, found := s.queue.Load(id) data, found := s.queue.Load(id)
if found { if found {
queue = data.([]QueueEvent) queue = data.([]QueueEvent)
} }
queue = append(queue, QueueEvent{Message: msg}) for _, m := range msg {
queue = append(queue, QueueEvent{Message: m, TimeStamp: time.Now()})
}
s.queue.Store(id, queue) s.queue.Store(id, queue)
return nil return nil
} else { } else {
@@ -120,8 +122,10 @@ func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error {
return err return err
} }
defer fh.Close() defer fh.Close()
for _, m := range msg {
return s.Append(fh, msg, time.Now()) err = s.Append(fh, m, time.Now())
}
return err
} }
} }

View File

@@ -7,7 +7,7 @@ import (
) )
type GrainPool[V any] interface { type GrainPool[V any] interface {
Apply(id uint64, mutation proto.Message) (V, error) Apply(id uint64, mutation ...proto.Message) (V, error)
Get(id uint64) (V, error) Get(id uint64) (V, error)
OwnerHost(id uint64) (Host, bool) OwnerHost(id uint64) (Host, bool)
Hostname() string Hostname() string

View File

@@ -10,7 +10,7 @@ import (
) )
type MutationRegistry interface { type MutationRegistry interface {
Apply(grain any, msg proto.Message) error Apply(grain any, msg ...proto.Message) error
RegisterMutations(handlers ...MutationHandler) RegisterMutations(handlers ...MutationHandler)
Create(typeName string) (proto.Message, bool) Create(typeName string) (proto.Message, bool)
GetTypeName(msg proto.Message) (string, bool) GetTypeName(msg proto.Message) (string, bool)
@@ -145,7 +145,7 @@ func (r *ProtoMutationRegistry) Create(typeName string) (proto.Message, bool) {
// Returns updated grain if successful. // Returns updated grain if successful.
// //
// If the mutation is not registered, returns (nil, ErrMutationNotRegistered). // If the mutation is not registered, returns (nil, ErrMutationNotRegistered).
func (r *ProtoMutationRegistry) Apply(grain any, msg proto.Message) error { func (r *ProtoMutationRegistry) Apply(grain any, msg ...proto.Message) error {
if grain == nil { if grain == nil {
return fmt.Errorf("nil grain") return fmt.Errorf("nil grain")
} }
@@ -161,10 +161,11 @@ func (r *ProtoMutationRegistry) Apply(grain any, msg proto.Message) error {
if !ok { if !ok {
return ErrMutationNotRegistered return ErrMutationNotRegistered
} }
for _, m := range msg {
if err := entry.Handle(grain, msg); err != nil { if err := entry.Handle(grain, m); err != nil {
return err return err
} }
}
// if entry.updateTotals { // if entry.updateTotals {
// grain.UpdateTotals() // grain.UpdateTotals()

View File

@@ -362,17 +362,17 @@ func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) {
// var ErrNotOwner = fmt.Errorf("not owner") // var ErrNotOwner = fmt.Errorf("not owner")
// Apply applies a mutation to a grain. // Apply applies a mutation to a grain.
func (p *SimpleGrainPool[V]) Apply(id uint64, mutation proto.Message) (*V, error) { func (p *SimpleGrainPool[V]) Apply(id uint64, mutation ...proto.Message) (*V, error) {
grain, err := p.getOrClaimGrain(id) grain, err := p.getOrClaimGrain(id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if applyErr := p.mutationRegistry.Apply(grain, mutation); applyErr != nil { if applyErr := p.mutationRegistry.Apply(grain, mutation...); applyErr != nil {
return nil, applyErr return nil, applyErr
} }
if p.storage != nil { if p.storage != nil {
go func() { go func() {
if err := p.storage.AppendEvent(id, mutation); err != nil { if err := p.storage.AppendEvent(id, mutation...); err != nil {
log.Printf("failed to store mutation for grain %d: %v", id, err) log.Printf("failed to store mutation for grain %d: %v", id, err)
} }
}() }()