From 0ba74101628b0c36b59208e83947fff8a9574a0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mats=20T=C3=B6rnberg?= Date: Sat, 11 Oct 2025 18:17:31 +0200 Subject: [PATCH] even more refactoring --- cart-grain.go | 23 +++--- discovery.go | 95 ---------------------- discovery_mock.go | 102 +++++++++++++++++++++++ grain-pool.go | 187 ++++++++----------------------------------- remotehost.go | 200 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 343 insertions(+), 264 deletions(-) create mode 100644 discovery_mock.go create mode 100644 remotehost.go diff --git a/cart-grain.go b/cart-grain.go index 1f5d100..d36f10f 100644 --- a/cart-grain.go +++ b/cart-grain.go @@ -113,12 +113,16 @@ func getItemData(sku string, qty int, country string) (*messages.AddItem, error) } stock := InStock - /*item.t - if item.StockLevel == "0" || item.StockLevel == "" { + item.HasStock() + stockValue, ok := item.GetNumberFieldValue(3) + if !ok || stockValue == 0 { stock = OutOfStock - } else if item.StockLevel == "5+" { - stock = LowStock - }*/ + } else { + if stockValue < 5 { + stock = LowStock + } + } + articleType, _ := item.GetStringFieldValue(1) //.Fields[1].(string) outletGrade, ok := item.GetStringFieldValue(20) //.Fields[20].(string) var outlet *string @@ -169,15 +173,6 @@ func (c *CartGrain) AddItem(sku string, qty int, country string, storeId *string return c.Apply(cartItem, false) } -/* -Legacy storage (event sourcing) removed in oneof refactor. -Kept stub (commented) for potential future reintroduction using proto envelopes. - -func (c *CartGrain) GetStorageMessage(since int64) []interface{} { - return nil -} -*/ - func (c *CartGrain) GetState() ([]byte, error) { return json.Marshal(c) } diff --git a/discovery.go b/discovery.go index 9681929..20952a9 100644 --- a/discovery.go +++ b/discovery.go @@ -2,7 +2,6 @@ package main import ( "context" - "sync" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -76,97 +75,3 @@ func NewK8sDiscovery(client *kubernetes.Clientset) *K8sDiscovery { client: client, } } - -// MockDiscovery is an in-memory Discovery implementation for tests. -// It allows deterministic injection of host additions/removals without -// depending on Kubernetes API machinery. -type MockDiscovery struct { - mu sync.RWMutex - hosts []string - events chan HostChange - closed bool - started bool -} - -// NewMockDiscovery creates a mock discovery with an initial host list. -func NewMockDiscovery(initial []string) *MockDiscovery { - cp := make([]string, len(initial)) - copy(cp, initial) - return &MockDiscovery{ - hosts: cp, - events: make(chan HostChange, 32), - } -} - -// Discover returns the current host snapshot. -func (m *MockDiscovery) Discover() ([]string, error) { - m.mu.RLock() - defer m.mu.RUnlock() - cp := make([]string, len(m.hosts)) - copy(cp, m.hosts) - return cp, nil -} - -// Watch returns a channel that will receive HostChange events. -// The channel is buffered; AddHost/RemoveHost push events non-blockingly. -func (m *MockDiscovery) Watch() (<-chan HostChange, error) { - m.mu.Lock() - defer m.mu.Unlock() - if m.closed { - return nil, context.Canceled - } - m.started = true - return m.events, nil -} - -// AddHost inserts a new host (if absent) and emits an Added event. -func (m *MockDiscovery) AddHost(host string) { - m.mu.Lock() - defer m.mu.Unlock() - if m.closed { - return - } - for _, h := range m.hosts { - if h == host { - return - } - } - m.hosts = append(m.hosts, host) - if m.started { - m.events <- HostChange{Host: host, Type: watch.Added} - } -} - -// RemoveHost removes a host (if present) and emits a Deleted event. -func (m *MockDiscovery) RemoveHost(host string) { - m.mu.Lock() - defer m.mu.Unlock() - if m.closed { - return - } - idx := -1 - for i, h := range m.hosts { - if h == host { - idx = i - break - } - } - if idx == -1 { - return - } - m.hosts = append(m.hosts[:idx], m.hosts[idx+1:]...) - if m.started { - m.events <- HostChange{Host: host, Type: watch.Deleted} - } -} - -// Close closes the event channel (idempotent). -func (m *MockDiscovery) Close() { - m.mu.Lock() - defer m.mu.Unlock() - if m.closed { - return - } - m.closed = true - close(m.events) -} diff --git a/discovery_mock.go b/discovery_mock.go new file mode 100644 index 0000000..c770d61 --- /dev/null +++ b/discovery_mock.go @@ -0,0 +1,102 @@ +package main + +import ( + "context" + "sync" + + "k8s.io/apimachinery/pkg/watch" +) + +// MockDiscovery is an in-memory Discovery implementation for tests. +// It allows deterministic injection of host additions/removals without +// depending on Kubernetes API machinery. +type MockDiscovery struct { + mu sync.RWMutex + hosts []string + events chan HostChange + closed bool + started bool +} + +// NewMockDiscovery creates a mock discovery with an initial host list. +func NewMockDiscovery(initial []string) *MockDiscovery { + cp := make([]string, len(initial)) + copy(cp, initial) + return &MockDiscovery{ + hosts: cp, + events: make(chan HostChange, 32), + } +} + +// Discover returns the current host snapshot. +func (m *MockDiscovery) Discover() ([]string, error) { + m.mu.RLock() + defer m.mu.RUnlock() + cp := make([]string, len(m.hosts)) + copy(cp, m.hosts) + return cp, nil +} + +// Watch returns a channel that will receive HostChange events. +// The channel is buffered; AddHost/RemoveHost push events non-blockingly. +func (m *MockDiscovery) Watch() (<-chan HostChange, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.closed { + return nil, context.Canceled + } + m.started = true + return m.events, nil +} + +// AddHost inserts a new host (if absent) and emits an Added event. +func (m *MockDiscovery) AddHost(host string) { + m.mu.Lock() + defer m.mu.Unlock() + if m.closed { + return + } + for _, h := range m.hosts { + if h == host { + return + } + } + m.hosts = append(m.hosts, host) + if m.started { + m.events <- HostChange{Host: host, Type: watch.Added} + } +} + +// RemoveHost removes a host (if present) and emits a Deleted event. +func (m *MockDiscovery) RemoveHost(host string) { + m.mu.Lock() + defer m.mu.Unlock() + if m.closed { + return + } + idx := -1 + for i, h := range m.hosts { + if h == host { + idx = i + break + } + } + if idx == -1 { + return + } + m.hosts = append(m.hosts[:idx], m.hosts[idx+1:]...) + if m.started { + m.events <- HostChange{Host: host, Type: watch.Deleted} + } +} + +// Close closes the event channel (idempotent). +func (m *MockDiscovery) Close() { + m.mu.Lock() + defer m.mu.Unlock() + if m.closed { + return + } + m.closed = true + close(m.events) +} diff --git a/grain-pool.go b/grain-pool.go index e4bddff..c5574e9 100644 --- a/grain-pool.go +++ b/grain-pool.go @@ -1,10 +1,8 @@ package main import ( - "bytes" "context" "fmt" - "io" "log" "net/http" "reflect" @@ -14,8 +12,6 @@ import ( messages "git.tornberg.me/go-cart-actor/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "k8s.io/apimachinery/pkg/watch" ) @@ -99,92 +95,16 @@ type CartPool struct { poolSize int // Cluster coordination -------------------------------------------------- - hostname string - remoteMu sync.RWMutex - remoteOwners map[CartId]*RemoteHostGRPC - remoteHosts map[string]*RemoteHostGRPC - discardedHostHandler *DiscardedHostHandler + hostname string + remoteMu sync.RWMutex + remoteOwners map[CartId]*RemoteHostGRPC + remoteHosts map[string]*RemoteHostGRPC + //discardedHostHandler *DiscardedHostHandler // House-keeping --------------------------------------------------------- purgeTicker *time.Ticker } -// RemoteHostGRPC mirrors the lightweight controller used for remote node -// interaction. -type RemoteHostGRPC struct { - Host string - HTTPBase string - Conn *grpc.ClientConn - Transport *http.Transport - Client *http.Client - ControlClient messages.ControlPlaneClient - MissedPings int -} - -func (h *RemoteHostGRPC) Name() string { - return h.Host -} - -func (h *RemoteHostGRPC) Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) { - target := fmt.Sprintf("%s%s", h.HTTPBase, r.URL.RequestURI()) - var bodyCopy []byte - if r.Body != nil && r.Body != http.NoBody { - var err error - bodyCopy, err = io.ReadAll(r.Body) - if err != nil { - http.Error(w, "proxy read error", http.StatusBadGateway) - return false, err - } - } - if r.Body != nil { - r.Body.Close() - } - var reqBody io.Reader - if len(bodyCopy) > 0 { - reqBody = bytes.NewReader(bodyCopy) - } - req, err := http.NewRequestWithContext(r.Context(), r.Method, target, reqBody) - if err != nil { - http.Error(w, "proxy build error", http.StatusBadGateway) - return false, err - } - r.Body = io.NopCloser(bytes.NewReader(bodyCopy)) - req.Header.Set("X-Forwarded-Host", r.Host) - if idStr := id.String(); idStr != "" { - req.Header.Set("X-Cart-Id", idStr) - } - for k, v := range r.Header { - for _, vv := range v { - req.Header.Add(k, vv) - } - } - res, err := h.Client.Do(req) - if err != nil { - http.Error(w, "proxy request error", http.StatusBadGateway) - return false, err - } - defer res.Body.Close() - for k, v := range res.Header { - for _, vv := range v { - w.Header().Add(k, vv) - } - } - w.Header().Set("X-Cart-Owner-Routed", "true") - if res.StatusCode >= 200 && res.StatusCode <= 299 { - w.WriteHeader(res.StatusCode) - _, copyErr := io.Copy(w, res.Body) - if copyErr != nil { - return true, copyErr - } - return true, nil - } - return false, fmt.Errorf("proxy response status %d", res.StatusCode) -} - -func (r *RemoteHostGRPC) IsHealthy() bool { - return r.MissedPings < 3 -} - // NewCartPool constructs a unified pool. Discovery may be nil for standalone // deployments. func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id CartId) (*CartGrain, error), discovery Discovery) (*CartPool, error) { @@ -199,8 +119,8 @@ func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id Car remoteHosts: make(map[string]*RemoteHostGRPC), } - p.discardedHostHandler = NewDiscardedHostHandler(1338) - p.discardedHostHandler.SetReconnectHandler(p.AddRemote) + // p.discardedHostHandler = NewDiscardedHostHandler(1338) + // p.discardedHostHandler.SetReconnectHandler(p.AddRemote) p.purgeTicker = time.NewTicker(time.Minute) go func() { @@ -429,57 +349,22 @@ func (p *CartPool) TakeOwnership(id CartId) { p.broadcastOwnership([]CartId{id}) } -func (p *CartPool) AddRemote(host string) { +func (p *CartPool) AddRemote(host string) (*RemoteHostGRPC, error) { if host == "" || host == p.hostname { - return + return nil, fmt.Errorf("invalid host") } p.remoteMu.Lock() if _, exists := p.remoteHosts[host]; exists { p.remoteMu.Unlock() - return + return nil, fmt.Errorf("host already exists") } p.remoteMu.Unlock() - target := fmt.Sprintf("%s:1337", host) - dialCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - conn, err := grpc.DialContext(dialCtx, target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) - cancel() + remote, err := NewRemoteHostGRPC(host) if err != nil { - log.Printf("AddRemote: dial %s failed: %v", target, err) - return - } - - controlClient := messages.NewControlPlaneClient(conn) - for retries := 0; retries < 3; retries++ { - ctx, pingCancel := context.WithTimeout(context.Background(), time.Second) - _, pingErr := controlClient.Ping(ctx, &messages.Empty{}) - pingCancel() - if pingErr == nil { - break - } - if retries == 2 { - log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr) - conn.Close() - return - } - time.Sleep(200 * time.Millisecond) - } - - transport := &http.Transport{ - MaxIdleConns: 100, - MaxIdleConnsPerHost: 100, - IdleConnTimeout: 120 * time.Second, - } - client := &http.Client{Transport: transport, Timeout: 10 * time.Second} - - remote := &RemoteHostGRPC{ - Host: host, - HTTPBase: fmt.Sprintf("http://%s:8080/cart", host), - Conn: conn, - Transport: transport, - Client: client, - ControlClient: controlClient, + log.Printf("AddRemote: NewRemoteHostGRPC %s failed: %v", host, err) + return nil, err } p.remoteMu.Lock() @@ -488,10 +373,10 @@ func (p *CartPool) AddRemote(host string) { connectedRemotes.Set(float64(p.RemoteCount())) log.Printf("Connected to remote host %s", host) - go p.pingLoop(remote) go p.initializeRemote(remote) go p.Negotiate() + return remote, nil } func (p *CartPool) initializeRemote(remote *RemoteHostGRPC) { @@ -519,6 +404,7 @@ func (p *CartPool) RemoveHost(host string) { p.remoteMu.Lock() remote, exists := p.remoteHosts[host] if exists { + go remote.Close() delete(p.remoteHosts, host) } for id, owner := range p.remoteOwners { @@ -562,23 +448,18 @@ func (p *CartPool) IsKnown(host string) bool { } func (p *CartPool) pingLoop(remote *RemoteHostGRPC) { - ticker := time.NewTicker(3 * time.Second) + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for range ticker.C { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - _, err := remote.ControlClient.Ping(ctx, &messages.Empty{}) - cancel() - if err != nil { - remote.MissedPings++ - log.Printf("Ping %s failed (%d)", remote.Host, remote.MissedPings) + if !remote.Ping() { if !remote.IsHealthy() { log.Printf("Remote %s unhealthy, removing", remote.Host) + p.Close() p.RemoveHost(remote.Host) return } continue } - remote.MissedPings = 0 } } @@ -607,14 +488,13 @@ func (p *CartPool) Negotiate() { p.remoteMu.RUnlock() for _, r := range remotes { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - reply, err := r.ControlClient.Negotiate(ctx, &messages.NegotiateRequest{KnownHosts: hosts}) - cancel() + knownByRemote, err := r.Negotiate(hosts) + if err != nil { log.Printf("Negotiate with %s failed: %v", r.Host, err) continue } - for _, h := range reply.Hosts { + for _, h := range knownByRemote { if !p.IsKnown(h) { p.AddRemote(h) } @@ -636,19 +516,16 @@ func (p *CartPool) broadcastOwnership(ids []CartId) { for _, r := range p.remoteHosts { if r.IsHealthy() { remotes = append(remotes, r) + } else { + log.Printf("Skipping announce to unhealthy remote %s", r.Host) + p.RemoveHost(r.Host) } } p.remoteMu.RUnlock() for _, remote := range remotes { go func(rh *RemoteHostGRPC) { - _, err := rh.ControlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ - Host: p.hostname, - CartIds: uids, - }) - if err != nil { - log.Printf("ownership announce to %s failed: %v", rh.Host, err) - } + rh.AnnounceOwnership(uids) }(remote) } } @@ -673,13 +550,7 @@ func (p *CartPool) broadcastExpiry(ids []CartId) { for _, remote := range remotes { go func(rh *RemoteHostGRPC) { - _, err := rh.ControlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{ - Host: p.hostname, - CartIds: uids, - }) - if err != nil { - log.Printf("expiry announce to %s failed: %v", rh.Host, err) - } + rh.AnnounceExpiry(uids) }(remote) } } @@ -691,6 +562,12 @@ func (p *CartPool) AdoptRemoteOwnership(host string, ids []string) { remoteHost, ok := p.remoteHosts[host] if !ok { log.Printf("AdoptRemoteOwnership: unknown host %s", host) + createdHost, err := p.AddRemote(host) + if err != nil { + log.Printf("AdoptRemoteOwnership: failed to add remote %s: %v", host, err) + return + } + remoteHost = createdHost } p.remoteMu.Lock() defer p.remoteMu.Unlock() diff --git a/remotehost.go b/remotehost.go new file mode 100644 index 0000000..348ead0 --- /dev/null +++ b/remotehost.go @@ -0,0 +1,200 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "net/http" + "time" + + messages "git.tornberg.me/go-cart-actor/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// RemoteHostGRPC mirrors the lightweight controller used for remote node +// interaction. +type RemoteHostGRPC struct { + Host string + HTTPBase string + Conn *grpc.ClientConn + Transport *http.Transport + Client *http.Client + ControlClient messages.ControlPlaneClient + MissedPings int +} + +func NewRemoteHostGRPC(host string) (*RemoteHostGRPC, error) { + + target := fmt.Sprintf("%s:1337", host) + + conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials())) + + if err != nil { + log.Printf("AddRemote: dial %s failed: %v", target, err) + return nil, err + } + + controlClient := messages.NewControlPlaneClient(conn) + for retries := 0; retries < 3; retries++ { + ctx, pingCancel := context.WithTimeout(context.Background(), time.Second) + _, pingErr := controlClient.Ping(ctx, &messages.Empty{}) + pingCancel() + if pingErr == nil { + break + } + if retries == 2 { + log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr) + conn.Close() + return nil, pingErr + } + time.Sleep(200 * time.Millisecond) + } + + transport := &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 120 * time.Second, + } + client := &http.Client{Transport: transport, Timeout: 10 * time.Second} + + return &RemoteHostGRPC{ + Host: host, + HTTPBase: fmt.Sprintf("http://%s:8080/cart", host), + Conn: conn, + Transport: transport, + Client: client, + ControlClient: controlClient, + MissedPings: 0, + }, nil +} + +func (h *RemoteHostGRPC) Name() string { + return h.Host +} + +func (h *RemoteHostGRPC) Close() error { + if h.Conn != nil { + h.Conn.Close() + } + return nil +} + +func (h *RemoteHostGRPC) Ping() bool { + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + _, err := h.ControlClient.Ping(ctx, &messages.Empty{}) + cancel() + if err != nil { + h.MissedPings++ + log.Printf("Ping %s failed (%d)", h.Host, h.MissedPings) + return false + } + + h.MissedPings = 0 + return true +} + +func (h *RemoteHostGRPC) Negotiate(knownHosts []string) ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + resp, err := h.ControlClient.Negotiate(ctx, &messages.NegotiateRequest{ + KnownHosts: knownHosts, + }) + if err != nil { + h.MissedPings++ + log.Printf("Negotiate %s failed: %v", h.Host, err) + return nil, err + } + h.MissedPings = 0 + return resp.Hosts, nil +} + +func (h *RemoteHostGRPC) AnnounceOwnership(uids []uint64) { + _, err := h.ControlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ + Host: h.Host, + CartIds: uids, + }) + if err != nil { + log.Printf("ownership announce to %s failed: %v", h.Host, err) + h.MissedPings++ + return + } + h.MissedPings = 0 +} + +func (h *RemoteHostGRPC) AnnounceExpiry(uids []uint64) { + _, err := h.ControlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{ + Host: h.Host, + CartIds: uids, + }) + if err != nil { + log.Printf("expiry announce to %s failed: %v", h.Host, err) + h.MissedPings++ + return + } + h.MissedPings = 0 +} + +func (h *RemoteHostGRPC) Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) { + target := fmt.Sprintf("%s%s", h.HTTPBase, r.URL.RequestURI()) + var bodyCopy []byte + if r.Body != nil && r.Body != http.NoBody { + var err error + bodyCopy, err = io.ReadAll(r.Body) + if err != nil { + http.Error(w, "proxy read error", http.StatusBadGateway) + return false, err + } + } + if r.Body != nil { + r.Body.Close() + } + var reqBody io.Reader + if len(bodyCopy) > 0 { + reqBody = bytes.NewReader(bodyCopy) + } + req, err := http.NewRequestWithContext(r.Context(), r.Method, target, reqBody) + if err != nil { + http.Error(w, "proxy build error", http.StatusBadGateway) + return false, err + } + r.Body = io.NopCloser(bytes.NewReader(bodyCopy)) + req.Header.Set("X-Forwarded-Host", r.Host) + if idStr := id.String(); idStr != "" { + req.Header.Set("X-Cart-Id", idStr) + } + for k, v := range r.Header { + for _, vv := range v { + req.Header.Add(k, vv) + } + } + res, err := h.Client.Do(req) + if err != nil { + http.Error(w, "proxy request error", http.StatusBadGateway) + return false, err + } + defer res.Body.Close() + for k, v := range res.Header { + for _, vv := range v { + w.Header().Add(k, vv) + } + } + w.Header().Set("X-Cart-Owner-Routed", "true") + if res.StatusCode >= 200 && res.StatusCode <= 299 { + w.WriteHeader(res.StatusCode) + _, copyErr := io.Copy(w, res.Body) + if copyErr != nil { + return true, copyErr + } + return true, nil + } + return false, fmt.Errorf("proxy response status %d", res.StatusCode) +} + +func (r *RemoteHostGRPC) IsHealthy() bool { + return r.MissedPings < 3 +}