diff --git a/cart-grain-pool.go b/cart-grain-pool.go index 7426fa8..54f54f1 100644 --- a/cart-grain-pool.go +++ b/cart-grain-pool.go @@ -219,6 +219,7 @@ func (p *CartPool) HandleOwnershipChange(host string, ids []uint64) error { p.localMu.Lock() defer p.localMu.Unlock() for _, id := range ids { + log.Printf("Handling ownership change for cart %d to host %s", id, host) delete(p.grains, id) p.remoteOwners[id] = remoteHost } @@ -226,15 +227,11 @@ func (p *CartPool) HandleOwnershipChange(host string, ids []uint64) error { } // SnapshotGrains returns a copy of the currently resident grains keyed by id. -func (p *CartPool) SnapshotGrains() map[CartId]*CartGrain { +func (p *CartPool) SnapshotGrains() map[uint64]*CartGrain { p.localMu.RLock() defer p.localMu.RUnlock() - out := make(map[CartId]*CartGrain, len(p.grains)) - for _, g := range p.grains { - if g != nil { - out[g.GetId()] = g - } - } + out := maps.Clone(p.grains) + return out } @@ -258,6 +255,11 @@ func (p *CartPool) SnapshotGrains() map[CartId]*CartGrain { // --------------------------------------------------------------------------- func (p *CartPool) TakeOwnership(id uint64) { + + if p.grains[id] != nil { + return + } + log.Printf("taking ownership of: %d", id) p.broadcastOwnership([]uint64{id}) } @@ -402,19 +404,19 @@ func (p *CartPool) SendNegotiation() { } p.remoteMu.RUnlock() - for _, r := range remotes { - knownByRemote, err := r.Negotiate(hosts) + p.forAllHosts(func(remote *proxy.RemoteHost) { + knownByRemote, err := remote.Negotiate(hosts) if err != nil { - log.Printf("Negotiate with %s failed: %v", r.Host, err) - continue + log.Printf("Negotiate with %s failed: %v", remote.Host, err) + return } for _, h := range knownByRemote { if !p.IsKnown(h) { - p.AddRemote(h) + go p.AddRemote(h) } } - } + }) } func (p *CartPool) forAllHosts(fn func(*proxy.RemoteHost)) { @@ -493,8 +495,7 @@ func (p *CartPool) Apply(id uint64, mutation any) (*CartGrain, error) { if applyErr == nil && result != nil { cartMutationsTotal.Inc() - //p.RefreshExpiry(id) - //cartActiveGrains.Set(float64(len(p.grains))) + } else if applyErr != nil { cartMutationFailuresTotal.Inc() } diff --git a/main.go b/main.go index fc2eea2..794bcf5 100644 --- a/main.go +++ b/main.go @@ -67,21 +67,21 @@ type App struct { storage *DiskStorage } -func (a *App) Save() error { - for id, grain := range a.pool.SnapshotGrains() { - if grain == nil { - continue - } - if grain.GetLastChange().After(a.storage.LastSaves[uint64(id)]) { +// func (a *App) Save() error { +// for id, grain := range a.pool.SnapshotGrains() { +// if grain == nil { +// continue +// } +// if grain.GetLastChange().After(a.storage.LastSaves[uint64(id)]) { - err := a.storage.Store(id, grain) - if err != nil { - log.Printf("Error saving grain %s: %v\n", id, err) - } - } - } - return nil -} +// err := a.storage.Store(id, grain) +// if err != nil { +// log.Printf("Error saving grain %s: %v\n", id, err) +// } +// } +// } +// return nil +// } var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") @@ -140,20 +140,20 @@ func main() { storage: storage, } - grpcSrv, err := actor.NewControlServer(":1337", pool) + grpcSrv, err := actor.NewControlServer[*CartGrain](":1337", pool) if err != nil { log.Fatalf("Error starting control plane gRPC server: %v\n", err) } defer grpcSrv.GracefulStop() - go func() { - for range time.Tick(time.Minute * 5) { - err := app.Save() - if err != nil { - log.Printf("Error saving: %v\n", err) - } - } - }() + // go func() { + // for range time.Tick(time.Minute * 5) { + // err := app.Save() + // if err != nil { + // log.Printf("Error saving: %v\n", err) + // } + // } + // }() orderHandler := &AmqpOrderHandler{ Url: amqpUrl, } @@ -344,7 +344,7 @@ func main() { go func() { sig := <-sigs fmt.Println("Shutting down due to signal:", sig) - app.Save() + //app.Save() pool.Close() done <- true diff --git a/pkg/proxy/remotehost.go b/pkg/proxy/remotehost.go index 3b17b4a..a7d90b8 100644 --- a/pkg/proxy/remotehost.go +++ b/pkg/proxy/remotehost.go @@ -1,7 +1,6 @@ package proxy import ( - "bytes" "context" "fmt" "io" @@ -153,28 +152,28 @@ func (h *RemoteHost) AnnounceExpiry(uids []uint64) { func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error) { target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI()) - var bodyCopy []byte - if r.Body != nil && r.Body != http.NoBody { - var err error - bodyCopy, err = io.ReadAll(r.Body) - if err != nil { - http.Error(w, "proxy read error", http.StatusBadGateway) - return false, err - } - } - if r.Body != nil { - r.Body.Close() - } - var reqBody io.Reader - if len(bodyCopy) > 0 { - reqBody = bytes.NewReader(bodyCopy) - } - req, err := http.NewRequestWithContext(r.Context(), r.Method, target, reqBody) + // var bodyCopy []byte + // if r.Body != nil && r.Body != http.NoBody { + // var err error + // bodyCopy, err = io.ReadAll(r.Body) + // if err != nil { + // http.Error(w, "proxy read error", http.StatusBadGateway) + // return false, err + // } + // } + // if r.Body != nil { + // r.Body.Close() + // } + // var reqBody io.Reader + // if len(bodyCopy) > 0 { + // reqBody = bytes.NewReader(bodyCopy) + // } + req, err := http.NewRequestWithContext(r.Context(), r.Method, target, r.Body) if err != nil { http.Error(w, "proxy build error", http.StatusBadGateway) return false, err } - r.Body = io.NopCloser(bytes.NewReader(bodyCopy)) + //r.Body = io.NopCloser(bytes.NewReader(bodyCopy)) req.Header.Set("X-Forwarded-Host", r.Host) for k, v := range r.Header { diff --git a/pool-server.go b/pool-server.go index 4b7bcfa..74b5002 100644 --- a/pool-server.go +++ b/pool-server.go @@ -24,12 +24,8 @@ func NewPoolServer(pool *CartPool, pod_name string) *PoolServer { } } -func (s *PoolServer) process(id CartId, mutation interface{}) (*CartGrain, error) { - grain, err := s.pool.Apply(uint64(id), mutation) - if err != nil { - return nil, err - } - return grain, nil +func (s *PoolServer) ApplyLocal(id CartId, mutation interface{}) (*CartGrain, error) { + return s.pool.Apply(uint64(id), mutation) } func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request, id CartId) error { @@ -43,7 +39,7 @@ func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request, id CartId func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request, id CartId) error { sku := r.PathValue("sku") - data, err := s.process(id, &messages.AddRequest{Sku: sku, Quantity: 1}) + data, err := s.ApplyLocal(id, &messages.AddRequest{Sku: sku, Quantity: 1}) if err != nil { return err } @@ -74,7 +70,7 @@ func (s *PoolServer) HandleDeleteItem(w http.ResponseWriter, r *http.Request, id if err != nil { return err } - data, err := s.process(id, &messages.RemoveItem{Id: int64(itemId)}) + data, err := s.ApplyLocal(id, &messages.RemoveItem{Id: int64(itemId)}) if err != nil { return err } @@ -94,7 +90,7 @@ func (s *PoolServer) HandleSetDelivery(w http.ResponseWriter, r *http.Request, i if err != nil { return err } - data, err := s.process(id, &messages.SetDelivery{ + data, err := s.ApplyLocal(id, &messages.SetDelivery{ Provider: delivery.Provider, Items: delivery.Items, PickupPoint: delivery.PickupPoint, @@ -117,7 +113,7 @@ func (s *PoolServer) HandleSetPickupPoint(w http.ResponseWriter, r *http.Request if err != nil { return err } - reply, err := s.process(id, &messages.SetPickupPoint{ + reply, err := s.ApplyLocal(id, &messages.SetPickupPoint{ DeliveryId: int64(deliveryId), Id: pickupPoint.Id, Name: pickupPoint.Name, @@ -139,7 +135,7 @@ func (s *PoolServer) HandleRemoveDelivery(w http.ResponseWriter, r *http.Request if err != nil { return err } - reply, err := s.process(id, &messages.RemoveDelivery{Id: int64(deliveryId)}) + reply, err := s.ApplyLocal(id, &messages.RemoveDelivery{Id: int64(deliveryId)}) if err != nil { return err } @@ -152,7 +148,7 @@ func (s *PoolServer) HandleQuantityChange(w http.ResponseWriter, r *http.Request if err != nil { return err } - reply, err := s.process(id, &changeQuantity) + reply, err := s.ApplyLocal(id, &changeQuantity) if err != nil { return err } @@ -165,7 +161,7 @@ func (s *PoolServer) HandleSetCartItems(w http.ResponseWriter, r *http.Request, if err != nil { return err } - reply, err := s.process(id, &setCartItems) + reply, err := s.ApplyLocal(id, &setCartItems) if err != nil { return err } @@ -178,7 +174,7 @@ func (s *PoolServer) HandleAddRequest(w http.ResponseWriter, r *http.Request, id if err != nil { return err } - reply, err := s.process(id, &addRequest) + reply, err := s.ApplyLocal(id, &addRequest) if err != nil { return err } @@ -365,14 +361,10 @@ func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request return func(cartId CartId, w http.ResponseWriter, r *http.Request) error { if ownerHost, ok := s.pool.OwnerHost(uint64(cartId)); ok { handled, err := ownerHost.Proxy(uint64(cartId), w, r) - if err != nil { - log.Printf("proxy failed: %v, taking ownership", err) - s.pool.TakeOwnership(uint64(cartId)) - } else if handled { + if err == nil && handled { return nil } } - // Local ownership or no owner known, proceed with local handling return fn(w, r, cartId)