From 24cd0b6ad77c36de72dd5c3517a21029ee1f8eeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mats=20T=C3=B6rnberg?= Date: Sat, 11 Oct 2025 10:22:47 +0200 Subject: [PATCH] major refactor --- grain-pool.go | 12 +- grpc_server.go | 26 +++- main.go | 7 +- ownership_middleware.go | 318 ---------------------------------------- pool-server.go | 229 +++++++++++++++-------------- synced-pool.go | 251 +++++++++++++++---------------- tcp-connection_test.go | 8 - 7 files changed, 278 insertions(+), 573 deletions(-) delete mode 100644 ownership_middleware.go delete mode 100644 tcp-connection_test.go diff --git a/grain-pool.go b/grain-pool.go index 8394394..f0d35d1 100644 --- a/grain-pool.go +++ b/grain-pool.go @@ -3,6 +3,7 @@ package main import ( "fmt" "log" + "net/http" "sync" "time" @@ -47,11 +48,16 @@ type GrainPool interface { Apply(id CartId, mutation interface{}) (*CartGrain, error) Get(id CartId) (*CartGrain, error) // OwnerHost returns the primary owner host for a given cart id. - OwnerHost(id CartId) string + OwnerHost(id CartId) (Host, bool) // Hostname returns the hostname of the local pool implementation. Hostname() string } +type Host interface { + Name() string + Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) +} + // Ttl keeps expiry info type Ttl struct { Expires time.Time @@ -269,8 +275,8 @@ func (p *GrainLocalPool) UnsafePointerToLegacyMap() uintptr { // OwnerHost implements the extended GrainPool interface for the standalone // local pool. Since the local pool has no concept of multi-host ownership, // it returns an empty string. Callers can treat empty as "local host". -func (p *GrainLocalPool) OwnerHost(id CartId) string { - return "" +func (p *GrainLocalPool) OwnerHost(id CartId) (Host, bool) { + return nil, false } // Hostname returns a blank string because GrainLocalPool does not track a node diff --git a/grpc_server.go b/grpc_server.go index 5ec4735..d4c4990 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -9,32 +9,42 @@ import ( messages "git.tornberg.me/go-cart-actor/proto" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" ) -// cartActorGRPCServer implements the CartActor and ControlPlane gRPC services. +// cartActorGRPCServer implements the ControlPlane gRPC services. // It delegates cart operations to a grain pool and cluster operations to a synced pool. type cartActorGRPCServer struct { messages.UnimplementedControlPlaneServer - pool GrainPool // For cart state mutations and queries + //pool GrainPool // For cart state mutations and queries syncedPool *SyncedPool // For cluster membership and control } // NewCartActorGRPCServer creates and initializes the server. -func NewCartActorGRPCServer(pool GrainPool, syncedPool *SyncedPool) *cartActorGRPCServer { +func NewCartActorGRPCServer(syncedPool *SyncedPool) *cartActorGRPCServer { return &cartActorGRPCServer{ - pool: pool, + //pool: pool, syncedPool: syncedPool, } } +func (s *cartActorGRPCServer) AnnounceOwnership(ctx context.Context, req *messages.OwnershipAnnounce) (*messages.OwnerChangeAck, error) { + for _, cartId := range req.CartIds { + s.syncedPool.removeLocalGrain(CartId(cartId)) + } + log.Printf("Ack count: %d", len(req.CartIds)) + return &messages.OwnerChangeAck{ + Accepted: true, + Message: "ownership announced", + }, nil +} + // ControlPlane: Ping func (s *cartActorGRPCServer) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) { // Expose cart owner cookie (first-touch owner = this host) for HTTP gateways translating gRPC metadata. // Gateways that propagate Set-Cookie can help establish sticky sessions at the edge. - _ = grpc.SendHeader(ctx, metadata.Pairs("set-cookie", fmt.Sprintf("cartowner=%s; Path=/; HttpOnly", s.syncedPool.Hostname()))) + //_ = grpc.SendHeader(ctx, metadata.Pairs("set-cookie", fmt.Sprintf("cartowner=%s; Path=/; HttpOnly", s.syncedPool.Hostname()))) return &messages.PingReply{ Host: s.syncedPool.Hostname(), UnixTime: time.Now().Unix(), @@ -93,14 +103,14 @@ func (s *cartActorGRPCServer) Closing(ctx context.Context, req *messages.Closing // StartGRPCServer configures and starts the unified gRPC server on the given address. // It registers both the CartActor and ControlPlane services. -func StartGRPCServer(addr string, pool GrainPool, syncedPool *SyncedPool) (*grpc.Server, error) { +func StartGRPCServer(addr string, syncedPool *SyncedPool) (*grpc.Server, error) { lis, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("failed to listen: %w", err) } grpcServer := grpc.NewServer() - server := NewCartActorGRPCServer(pool, syncedPool) + server := NewCartActorGRPCServer(syncedPool) messages.RegisterControlPlaneServer(grpcServer, server) reflection.Register(grpcServer) diff --git a/main.go b/main.go index 3669eb3..dd9f4fa 100644 --- a/main.go +++ b/main.go @@ -164,19 +164,20 @@ func main() { if err != nil { log.Printf("Error loading state: %v\n", err) } + localPool := NewGrainLocalPool(2*65535, 15*time.Minute, spawn) app := &App{ - pool: NewGrainLocalPool(65535, 15*time.Minute, spawn), + pool: localPool, storage: storage, } - syncedPool, err := NewSyncedPool(app.pool, podIp, GetDiscovery()) + syncedPool, err := NewSyncedPool(localPool, podIp, GetDiscovery()) if err != nil { log.Fatalf("Error creating synced pool: %v\n", err) } // Start unified gRPC server (CartActor + ControlPlane) replacing legacy RPC server on :1337 // TODO: Remove any remaining legacy RPC server references and deprecated frame-based code after full gRPC migration is validated. - grpcSrv, err := StartGRPCServer(":1337", app.pool, syncedPool) + grpcSrv, err := StartGRPCServer(":1337", syncedPool) if err != nil { log.Fatalf("Error starting gRPC server: %v\n", err) } diff --git a/ownership_middleware.go b/ownership_middleware.go deleted file mode 100644 index 99d2c60..0000000 --- a/ownership_middleware.go +++ /dev/null @@ -1,318 +0,0 @@ -package main - -import ( - "bytes" - "io" - "net" - "net/http" - "os" - "strings" - "time" -) - -// OwnershipProxyMiddleware provides HTTP-layer routing to the primary owner -// of a cart before the request hits local handlers. -// -// Motivation: -// -// In the current system SyncedPool can proxy cart mutations to remote owners -// via remote grains (gRPC). For a simpler deployment you can instead forward -// the incoming HTTP request directly to the owning host and let only the -// owner execute the standard handlers (which apply mutations locally). -// -// Behavior: -// 1. Attempts to extract a cart id from (in priority order): -// - Cookie "cartid" -// - Path segment after "/byid/{id}" (e.g. /cart/byid/abc123/add/sku) -// 2. Resolves the primary owner host using the consistent hashing ring -// maintained by SyncedPool. -// 3. If the owner is the local host (or no id found), the request proceeds. -// 4. If the owner is a different host, the middleware performs an in-cluster -// HTTP proxy (single-hop) to http://:? -// and streams the response back to the client. -// 5. Adds headers: -// X-Cart-Owner: -// X-Cart-Owner-Routed: "true" (only when proxied) -// X-Cart-Id: (when available) -// On local handling (not proxied) X-Cart-Owner-Routed is "false". -// -// Configuration: -// -// CART_SERVICE_PORT (env) - target port for proxying (default: 8080) -// CART_PROXY_TIMEOUT_MS (env) - timeout for outbound proxy calls (default: 800) -// -// Integration: -// -// Wrap just the cart mux: -// -// cartMux := syncedServer.Serve() // existing cart handlers -// wrapped := OwnershipProxyMiddleware(syncedPool)(cartMux) -// mux.Handle("/cart/", http.StripPrefix("/cart", wrapped)) -// -// Fallbacks: -// -// If extraction or proxying fails, a 502 is returned (except missing cart id -// which simply skips routing). Timeouts produce 504. -// -// NOTE: -// - This does NOT (yet) support sticky upgrade / websockets. -// - Only primary ownership is considered (replicas ignored). -// - This keeps control plane & ring logic unmodified. -// -// You can gradually phase out remote grain logic by placing this middleware -// in front while leaving the rest of the code untouched. -func OwnershipProxyMiddleware(pool *SyncedPool) func(http.Handler) http.Handler { - localHost := pool.Hostname() - targetPort := envOr("CART_SERVICE_PORT", "8080") - timeout := envDurationOr("CART_PROXY_TIMEOUT_MS", 800*time.Millisecond) - - client := &http.Client{ - Timeout: timeout, - Transport: &http.Transport{ - MaxIdleConnsPerHost: 32, - IdleConnTimeout: 90 * time.Second, - // Dialer with small timeouts to fail fast inside cluster - DialContext: (&net.Dialer{ - Timeout: 300 * time.Millisecond, - KeepAlive: 30 * time.Second, - }).DialContext, - }, - } - - return func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - // CORS preflight / safe methods that don't need routing without id. - if r.Method == http.MethodOptions { - next.ServeHTTP(w, r) - return - } - - cartId, ok := extractCartIdFromRequest(r) - if !ok || cartId.String() == "" { - // No cart id available -> cannot determine ownership; proceed locally. - w.Header().Set("X-Cart-Owner-Routed", "false") - next.ServeHTTP(w, r) - return - } - - owner := pool.OwnerHost(cartId) - w.Header().Set("X-Cart-Id", cartId.String()) - w.Header().Set("X-Cart-Owner", owner) - - // Route locally if we're the owner or owner resolution empty. - if owner == "" || owner == localHost { - w.Header().Set("X-Cart-Owner-Routed", "false") - next.ServeHTTP(w, r) - return - } - - // Proxy to remote owner - proxyURL := buildProxyURL(r, owner, targetPort) - bodyBuf, err := readBodyDuplicate(r) - if err != nil { - http.Error(w, "failed to read request body", http.StatusBadGateway) - return - } - - req, err := http.NewRequestWithContext(r.Context(), r.Method, proxyURL, bodyBuf) - if err != nil { - http.Error(w, "failed to create proxy request", http.StatusBadGateway) - return - } - copyHeaders(req.Header, r.Header) - // Ensure we don't forward hop-by-hop headers - cleanHopHeaders(req.Header) - req.Header.Set("X-Forwarded-For", appendForwardedFor(r)) - req.Header.Set("X-Forwarded-Host", r.Host) - req.Header.Set("X-Forwarded-Proto", schemeFromRequest(r)) - req.Header.Set("X-Cart-Forwarded", "true") - - start := time.Now() - resp, err := client.Do(req) - if err != nil { - if os.IsTimeout(err) || strings.Contains(err.Error(), "timeout") { - http.Error(w, "gateway timeout contacting owner", http.StatusGatewayTimeout) - return - } - http.Error(w, "upstream owner error", http.StatusBadGateway) - return - } - defer resp.Body.Close() - - // Copy status + headers - copyHeaders(w.Header(), resp.Header) - w.Header().Set("X-Cart-Owner-Routed", "true") - w.Header().Set("X-Cart-Owner-Latency-Ms", durationMs(time.Since(start))) - w.WriteHeader(resp.StatusCode) - io.Copy(w, resp.Body) - }) - } -} - -// (Removed duplicate OwnerHost method; single implementation now lives in synced-pool.go) - -// extractCartIdFromRequest tries cookie first, then path form /byid/{id}/... -func extractCartIdFromRequest(r *http.Request) (CartId, bool) { - // Cookie - if c, err := r.Cookie("cartid"); err == nil && c.Value != "" { - if parsed, ok := ParseCartId(c.Value); ok { - return parsed, true - } - // Invalid existing cookie value: issue a fresh id (breaking change behavior) - newId := MustNewCartId() - return newId, true - } - // Path-based: locate "byid" segment - parts := splitPath(r.URL.Path) - for i := 0; i < len(parts); i++ { - if parts[i] == "byid" && i+1 < len(parts) { - raw := parts[i+1] - if raw != "" { - if parsed, ok := ParseCartId(raw); ok { - return parsed, true - } - } - } - } - var zero CartId - return zero, false -} - -// Helpers - -func envOr(key, def string) string { - if v := os.Getenv(key); v != "" { - return v - } - return def -} - -func envDurationOr(key string, def time.Duration) time.Duration { - if v := os.Getenv(key); v != "" { - if d, err := time.ParseDuration(v); err == nil { - return d - } - } - return def -} - -func buildProxyURL(r *http.Request, host, port string) string { - sb := &strings.Builder{} - sb.WriteString("http://") - sb.WriteString(host) - if port != "" { - sb.WriteString(":") - sb.WriteString(port) - } - // Preserve original path & query (already includes /cart prefix stripped? depends on where middleware placed) - sb.WriteString(r.URL.Path) - if rq := r.URL.RawQuery; rq != "" { - sb.WriteString("?") - sb.WriteString(rq) - } - return sb.String() -} - -func readBodyDuplicate(r *http.Request) (io.ReadCloser, error) { - if r.Body == nil { - return http.NoBody, nil - } - defer r.Body.Close() - buf, err := io.ReadAll(r.Body) - if err != nil { - return nil, err - } - // Restore original for downstream if local (we only call when proxying, but safe) - r.Body = io.NopCloser(bytes.NewReader(buf)) - return io.NopCloser(bytes.NewReader(buf)), nil -} - -func copyHeaders(dst, src http.Header) { - for k, vv := range src { - // Skip hop-by-hop; they'll be cleaned anyway - for _, v := range vv { - dst.Add(k, v) - } - } -} - -var hopHeaders = map[string]struct{}{ - "Connection": {}, - "Proxy-Connection": {}, - "Keep-Alive": {}, - "Proxy-Authenticate": {}, - "Proxy-Authorization": {}, - "Te": {}, - "Trailer": {}, - "Transfer-Encoding": {}, - "Upgrade": {}, -} - -func cleanHopHeaders(h http.Header) { - for k := range hopHeaders { - h.Del(k) - } -} - -func appendForwardedFor(r *http.Request) string { - host, _, _ := net.SplitHostPort(r.RemoteAddr) - if host == "" { - host = r.RemoteAddr - } - prior := r.Header.Get("X-Forwarded-For") - if prior == "" { - return host - } - return prior + ", " + host -} - -func schemeFromRequest(r *http.Request) string { - if r.TLS != nil { - return "https" - } - if proto := r.Header.Get("X-Forwarded-Proto"); proto != "" { - return proto - } - return "http" -} - -func splitPath(p string) []string { - if p == "" || p == "/" { - return nil - } - trimmed := strings.TrimPrefix(p, "/") - if trimmed == "" { - return nil - } - parts := strings.Split(trimmed, "/") - return parts -} - -func durationMs(d time.Duration) string { - return strconvFormatInt(int64(d / time.Millisecond)) -} - -// strconvFormatInt is a tiny helper to avoid importing strconv for one use. -func strconvFormatInt(i int64) string { - // Fast int64 -> string (base 10) without strconv for small dependency surface. - if i == 0 { - return "0" - } - neg := i < 0 - if neg { - i = -i - } - var buf [20]byte - pos := len(buf) - for i > 0 { - pos-- - buf[pos] = byte('0' + (i % 10)) - i /= 10 - } - if neg { - pos-- - buf[pos] = '-' - } - return string(buf[pos:]) -} diff --git a/pool-server.go b/pool-server.go index b7850f8..fee45bb 100644 --- a/pool-server.go +++ b/pool-server.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "fmt" - "io" "log" "net/http" "strconv" @@ -274,7 +273,7 @@ or panic-on-error helper: id := MustNewCartId() */ -func CookieCartIdHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId) error) func(w http.ResponseWriter, r *http.Request) error { +func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) error { return func(w http.ResponseWriter, r *http.Request) error { var id CartId cookie, err := r.Cookie("cartid") @@ -308,12 +307,12 @@ func CookieCartIdHandler(fn func(w http.ResponseWriter, r *http.Request, cartId id = parsed } } - if ownershipProxyAfterExtraction != nil { - if handled, err := ownershipProxyAfterExtraction(id, w, r); handled || err != nil { - return err - } - } - return fn(w, r, id) + // if ownershipProxyAfterExtraction != nil { + // if handled, err := ownershipProxyAfterExtraction(id, w, r); handled || err != nil { + // return err + // } + // } + return fn(id, w, r) } } @@ -334,103 +333,113 @@ func (s *PoolServer) RemoveCartCookie(w http.ResponseWriter, r *http.Request, ca return nil } -func CartIdHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId) error) func(w http.ResponseWriter, r *http.Request) error { +func CartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) error { return func(w http.ResponseWriter, r *http.Request) error { raw := r.PathValue("id") // If no id supplied, generate a new one if raw == "" { id := MustNewCartId() w.Header().Set("Set-Cart-Id", id.String()) - if ownershipProxyAfterExtraction != nil { - if handled, err := ownershipProxyAfterExtraction(id, w, r); handled || err != nil { - return err - } - } - return fn(w, r, id) + + return fn(id, w, r) } // Parse base62 cart id id, ok := ParseCartId(raw) if !ok { return fmt.Errorf("invalid cart id format") } - if ownershipProxyAfterExtraction != nil { - if handled, err := ownershipProxyAfterExtraction(id, w, r); handled || err != nil { - return err - } - } - return fn(w, r, id) + + return fn(id, w, r) } } -var ownershipProxyAfterExtraction func(cartId CartId, w http.ResponseWriter, r *http.Request) (handled bool, err error) +func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId) error) func(cartId CartId, w http.ResponseWriter, r *http.Request) error { + return func(cartId CartId, w http.ResponseWriter, r *http.Request) error { + if ownerHost, ok := s.pool.OwnerHost(cartId); ok { + ok, err := ownerHost.Proxy(cartId, w, r) + if ok || err != nil { + log.Printf("proxy failed: %v", err) + // todo take ownership!! + } else { + return nil + } + } + // Local ownership or no owner known, proceed with local handling + + return fn(w, r, cartId) + + } +} + +//var ownershipProxyAfterExtraction func(cartId CartId, w http.ResponseWriter, r *http.Request) (handled bool, err error) func (s *PoolServer) Serve() *http.ServeMux { - // Install ownership proxy hook that runs AFTER id extraction (cookie OR path) - ownershipProxyAfterExtraction = func(cartId CartId, w http.ResponseWriter, r *http.Request) (bool, error) { - if cartId.String() == "" { - return false, nil - } - owner := s.pool.OwnerHost(cartId) - if owner == "" || owner == s.pool.Hostname() { - // Set / refresh cartowner cookie pointing to the local host (claim or already owned). - localHost := owner - if localHost == "" { - localHost = s.pool.Hostname() - } - http.SetCookie(w, &http.Cookie{ - Name: "cartowner", - Value: localHost, - Path: "/", - HttpOnly: true, - SameSite: http.SameSiteLaxMode, - }) - return false, nil - } - // For remote ownership set cartowner cookie to remote host for sticky sessions. - http.SetCookie(w, &http.Cookie{ - Name: "cartowner", - Value: owner, - Path: "/", - HttpOnly: true, - SameSite: http.SameSiteLaxMode, - }) - // Proxy logic (simplified): reuse existing request to owning host on same port. - target := "http://" + owner + r.URL.Path - if q := r.URL.RawQuery; q != "" { - target += "?" + q - } - req, err := http.NewRequestWithContext(r.Context(), r.Method, target, r.Body) - if err != nil { - http.Error(w, "proxy build error", http.StatusBadGateway) - return true, err - } - for k, v := range r.Header { - for _, vv := range v { - req.Header.Add(k, vv) - } - } - req.Header.Set("X-Forwarded-Host", r.Host) - req.Header.Set("X-Cart-Id", cartId.String()) - req.Header.Set("X-Cart-Owner", owner) - resp, err := http.DefaultClient.Do(req) - if err != nil { - http.Error(w, "proxy upstream error", http.StatusBadGateway) - return true, err - } - defer resp.Body.Close() - for k, v := range resp.Header { - for _, vv := range v { - w.Header().Add(k, vv) - } - } - w.Header().Set("X-Cart-Owner-Routed", "true") - w.WriteHeader(resp.StatusCode) - _, copyErr := io.Copy(w, resp.Body) - if copyErr != nil { - return true, copyErr - } - return true, nil - } + // // Install ownership proxy hook that runs AFTER id extraction (cookie OR path) + // ownershipProxyAfterExtraction = func(cartId CartId, w http.ResponseWriter, r *http.Request) (bool, error) { + // if cartId.String() == "" { + // return false, nil + // } + // owner := s.pool.OwnerHost(cartId) + // if owner == "" || owner == s.pool.Hostname() { + // // Set / refresh cartowner cookie pointing to the local host (claim or already owned). + // localHost := owner + // if localHost == "" { + // localHost = s.pool.Hostname() + // } + // http.SetCookie(w, &http.Cookie{ + // Name: "cartowner", + // Value: localHost, + // Path: "/", + // HttpOnly: true, + // SameSite: http.SameSiteLaxMode, + // }) + // return false, nil + // } + // // For remote ownership set cartowner cookie to remote host for sticky sessions. + // http.SetCookie(w, &http.Cookie{ + // Name: "cartowner", + // Value: owner, + // Path: "/", + // HttpOnly: true, + // SameSite: http.SameSiteLaxMode, + // }) + // // Proxy logic (simplified): reuse existing request to owning host on same port. + // target := "http://" + owner + r.URL.Path + // if q := r.URL.RawQuery; q != "" { + // target += "?" + q + // } + // req, err := http.NewRequestWithContext(r.Context(), r.Method, target, r.Body) + // if err != nil { + // http.Error(w, "proxy build error", http.StatusBadGateway) + // return true, err + // } + // for k, v := range r.Header { + // for _, vv := range v { + // req.Header.Add(k, vv) + // } + // } + // req.Header.Set("X-Forwarded-Host", r.Host) + // req.Header.Set("X-Cart-Id", cartId.String()) + // req.Header.Set("X-Cart-Owner", owner) + // resp, err := http.DefaultClient.Do(req) + // if err != nil { + // http.Error(w, "proxy upstream error", http.StatusBadGateway) + // return true, err + // } + // defer resp.Body.Close() + // for k, v := range resp.Header { + // for _, vv := range v { + // w.Header().Add(k, vv) + // } + // } + // w.Header().Set("X-Cart-Owner-Routed", "true") + // w.WriteHeader(resp.StatusCode) + // _, copyErr := io.Copy(w, resp.Body) + // if copyErr != nil { + // return true, copyErr + // } + // return true, nil + // } mux := http.NewServeMux() mux.HandleFunc("OPTIONS /", func(w http.ResponseWriter, r *http.Request) { @@ -440,29 +449,29 @@ func (s *PoolServer) Serve() *http.ServeMux { w.WriteHeader(http.StatusOK) }) - mux.HandleFunc("GET /", ErrorHandler(CookieCartIdHandler(s.HandleGet))) - mux.HandleFunc("GET /add/{sku}", ErrorHandler(CookieCartIdHandler(s.HandleAddSku))) - mux.HandleFunc("POST /", ErrorHandler(CookieCartIdHandler(s.HandleAddRequest))) - mux.HandleFunc("POST /set", ErrorHandler(CookieCartIdHandler(s.HandleSetCartItems))) - mux.HandleFunc("DELETE /{itemId}", ErrorHandler(CookieCartIdHandler(s.HandleDeleteItem))) - mux.HandleFunc("PUT /", ErrorHandler(CookieCartIdHandler(s.HandleQuantityChange))) - mux.HandleFunc("DELETE /", ErrorHandler(CookieCartIdHandler(s.RemoveCartCookie))) - mux.HandleFunc("POST /delivery", ErrorHandler(CookieCartIdHandler(s.HandleSetDelivery))) - mux.HandleFunc("DELETE /delivery/{deliveryId}", ErrorHandler(CookieCartIdHandler(s.HandleRemoveDelivery))) - mux.HandleFunc("PUT /delivery/{deliveryId}/pickupPoint", ErrorHandler(CookieCartIdHandler(s.HandleSetPickupPoint))) - mux.HandleFunc("GET /checkout", ErrorHandler(CookieCartIdHandler(s.HandleCheckout))) - mux.HandleFunc("GET /confirmation/{orderId}", ErrorHandler(CookieCartIdHandler(s.HandleConfirmation))) + mux.HandleFunc("GET /", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleGet)))) + mux.HandleFunc("GET /add/{sku}", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleAddSku)))) + mux.HandleFunc("POST /", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleAddRequest)))) + mux.HandleFunc("POST /set", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleSetCartItems)))) + mux.HandleFunc("DELETE /{itemId}", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleDeleteItem)))) + mux.HandleFunc("PUT /", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleQuantityChange)))) + mux.HandleFunc("DELETE /", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.RemoveCartCookie)))) + mux.HandleFunc("POST /delivery", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleSetDelivery)))) + mux.HandleFunc("DELETE /delivery/{deliveryId}", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleRemoveDelivery)))) + mux.HandleFunc("PUT /delivery/{deliveryId}/pickupPoint", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleSetPickupPoint)))) + mux.HandleFunc("GET /checkout", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleCheckout)))) + mux.HandleFunc("GET /confirmation/{orderId}", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleConfirmation)))) - mux.HandleFunc("GET /byid/{id}", ErrorHandler(CartIdHandler(s.HandleGet))) - mux.HandleFunc("GET /byid/{id}/add/{sku}", ErrorHandler(CartIdHandler(s.HandleAddSku))) - mux.HandleFunc("POST /byid/{id}", ErrorHandler(CartIdHandler(s.HandleAddRequest))) - mux.HandleFunc("DELETE /byid/{id}/{itemId}", ErrorHandler(CartIdHandler(s.HandleDeleteItem))) - mux.HandleFunc("PUT /byid/{id}", ErrorHandler(CartIdHandler(s.HandleQuantityChange))) - mux.HandleFunc("POST /byid/{id}/delivery", ErrorHandler(CartIdHandler(s.HandleSetDelivery))) - mux.HandleFunc("DELETE /byid/{id}/delivery/{deliveryId}", ErrorHandler(CartIdHandler(s.HandleRemoveDelivery))) - mux.HandleFunc("PUT /byid/{id}/delivery/{deliveryId}/pickupPoint", ErrorHandler(CartIdHandler(s.HandleSetPickupPoint))) - mux.HandleFunc("GET /byid/{id}/checkout", ErrorHandler(CartIdHandler(s.HandleCheckout))) - mux.HandleFunc("GET /byid/{id}/confirmation", ErrorHandler(CartIdHandler(s.HandleConfirmation))) + mux.HandleFunc("GET /byid/{id}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleGet)))) + mux.HandleFunc("GET /byid/{id}/add/{sku}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleAddSku)))) + mux.HandleFunc("POST /byid/{id}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleAddRequest)))) + mux.HandleFunc("DELETE /byid/{id}/{itemId}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleDeleteItem)))) + mux.HandleFunc("PUT /byid/{id}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleQuantityChange)))) + mux.HandleFunc("POST /byid/{id}/delivery", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleSetDelivery)))) + mux.HandleFunc("DELETE /byid/{id}/delivery/{deliveryId}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleRemoveDelivery)))) + mux.HandleFunc("PUT /byid/{id}/delivery/{deliveryId}/pickupPoint", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleSetPickupPoint)))) + mux.HandleFunc("GET /byid/{id}/checkout", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleCheckout)))) + mux.HandleFunc("GET /byid/{id}/confirmation", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleConfirmation)))) return mux } diff --git a/synced-pool.go b/synced-pool.go index 65f9321..65db9ad 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -3,11 +3,14 @@ package main import ( "context" "fmt" + "io" "log" + "net/http" "reflect" "sync" "time" + messages "git.tornberg.me/go-cart-actor/proto" proto "git.tornberg.me/go-cart-actor/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -31,7 +34,7 @@ type SyncedPool struct { // New ownership tracking (first-touch / announcement model) // remoteOwners maps cart id -> owning host (excluding locally owned carts which live in local.grains) - remoteOwners map[CartId]string + remoteOwners map[CartId]*RemoteHostGRPC mu sync.RWMutex @@ -46,10 +49,55 @@ type SyncedPool struct { type RemoteHostGRPC struct { Host string Conn *grpc.ClientConn + Transport *http.Transport + Client *http.Client ControlClient proto.ControlPlaneClient MissedPings int } +func (h *RemoteHostGRPC) Name() string { + return h.Host +} + +func (h *RemoteHostGRPC) Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) { + + req, err := http.NewRequestWithContext(r.Context(), r.Method, h.Host, r.Body) + + if err != nil { + http.Error(w, "proxy build error", http.StatusBadGateway) + return true, err + } + for k, v := range r.Header { + for _, vv := range v { + req.Header.Add(k, vv) + } + } + res, err := h.Client.Do(req) + if err != nil { + http.Error(w, "proxy request error", http.StatusBadGateway) + return true, err + } + defer res.Body.Close() + + for k, v := range res.Header { + for _, vv := range v { + w.Header().Add(k, vv) + } + } + w.Header().Set("X-Cart-Owner-Routed", "true") + if res.StatusCode >= 200 && res.StatusCode <= 299 { + w.WriteHeader(res.StatusCode) + _, copyErr := io.Copy(w, res.Body) + if copyErr != nil { + return true, copyErr + } + return true, nil + } + + return false, fmt.Errorf("proxy response status %d", res.StatusCode) + +} + func (r *RemoteHostGRPC) IsHealthy() bool { return r.MissedPings < 3 } @@ -87,12 +135,10 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) LocalHostname: hostname, local: local, remoteHosts: make(map[string]*RemoteHostGRPC), - remoteOwners: make(map[CartId]string), + remoteOwners: make(map[CartId]*RemoteHostGRPC), discardedHostHandler: NewDiscardedHostHandler(1338), } p.discardedHostHandler.SetReconnectHandler(p.AddRemote) - // Initialize empty ring (will be rebuilt after first AddRemote or discovery event) - p.rebuildRing() if discovery != nil { go func() { @@ -143,9 +189,9 @@ func (p *SyncedPool) AddRemote(host string) { p.mu.Unlock() target := fmt.Sprintf("%s:1337", host) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - conn, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock()) + //ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + //defer cancel() + conn, err := grpc.NewClient(target) //grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Printf("AddRemote: dial %s failed: %v", target, err) return @@ -170,11 +216,22 @@ func (p *SyncedPool) AddRemote(host string) { return } } + transport := &http.Transport{ + MaxIdleConns: 100, // Maximum idle connections + MaxIdleConnsPerHost: 100, // Maximum idle connections per host + IdleConnTimeout: 120 * time.Second, // Timeout for idle connections + } + + client := &http.Client{ + Transport: transport, + Timeout: 10 * time.Second, // Request timeout + } remote := &RemoteHostGRPC{ - Host: host, - Conn: conn, - + Host: host, + Conn: conn, + Transport: transport, + Client: client, ControlClient: controlClient, MissedPings: 0, } @@ -184,7 +241,7 @@ func (p *SyncedPool) AddRemote(host string) { p.mu.Unlock() connectedRemotes.Set(float64(p.RemoteCount())) // Rebuild consistent hashing ring including this new host - p.rebuildRing() + //p.rebuildRing() log.Printf("Connected to remote host %s", host) @@ -209,7 +266,7 @@ func (p *SyncedPool) initializeRemote(remote *RemoteHostGRPC) { // Only set if not already claimed (first claim wins) if _, exists := p.remoteOwners[CartId(cid)]; !exists { - p.remoteOwners[CartId(cid)] = remote.Host + p.remoteOwners[CartId(cid)] = remote } count++ } @@ -226,7 +283,7 @@ func (p *SyncedPool) RemoveHost(host string) { } // purge remote ownership entries for this host for id, h := range p.remoteOwners { - if h == host { + if h.Host == host { delete(p.remoteOwners, id) } } @@ -237,7 +294,7 @@ func (p *SyncedPool) RemoveHost(host string) { } connectedRemotes.Set(float64(p.RemoteCount())) // Rebuild ring after host removal - p.rebuildRing() + // p.rebuildRing() } // RemoteCount returns number of tracked remote hosts. @@ -355,29 +412,6 @@ func (p *SyncedPool) GetHealthyRemotes() []*RemoteHostGRPC { return ret } -// rebuildRing removed (ring no longer used in first-touch ownership model) -func (p *SyncedPool) rebuildRing() {} - -// (All ring construction & metrics removed) - -// ForceRingRefresh kept as no-op for backward compatibility. -func (p *SyncedPool) ForceRingRefresh() {} - -// ownersFor removed (ring-based ownership deprecated) -func (p *SyncedPool) ownersFor(id CartId) []string { - return []string{p.LocalHostname} -} - -// ownerHostFor retained as wrapper to satisfy existing calls (always local) -func (p *SyncedPool) ownerHostFor(id CartId) string { - return p.LocalHostname -} - -// DebugOwnerHost exposes (for tests) the currently computed primary owner host. -func (p *SyncedPool) DebugOwnerHost(id CartId) string { - return p.ownerHostFor(id) -} - func (p *SyncedPool) removeLocalGrain(id CartId) { p.mu.Lock() delete(p.local.grains, uint64(id)) @@ -398,42 +432,33 @@ var ErrNotOwner = fmt.Errorf("not owner") // // NOTE: This does NOT (yet) reconcile conflicting announcements; first claim // wins. Later improvements can add tie-break via timestamp or host ordering. -func (p *SyncedPool) resolveOwnerFirstTouch(id CartId) (string, error) { +func (p *SyncedPool) resolveOwnerFirstTouch(id CartId) error { // Fast local existence check p.local.mu.RLock() _, existsLocal := p.local.grains[uint64(id)] p.local.mu.RUnlock() if existsLocal { - return p.LocalHostname, nil + return nil } // Remote ownership map lookup p.mu.RLock() remoteHost, foundRemote := p.remoteOwners[id] p.mu.RUnlock() - if foundRemote && remoteHost != "" { - return remoteHost, nil + if foundRemote && remoteHost.Host != "" { + log.Printf("other owner exists %s", remoteHost.Host) + return nil } // Claim: spawn locally _, err := p.local.GetGrain(id) if err != nil { - return "", err + return err } - // Record (defensive) in remoteOwners pointing to self (not strictly needed - // for local queries, but keeps a single lookup structure). - p.mu.Lock() - if _, stillMissing := p.remoteOwners[id]; !stillMissing { - // Another goroutine inserted meanwhile; keep theirs (first claim wins). - } else { - p.remoteOwners[id] = p.LocalHostname - } - p.mu.Unlock() - // Announce asynchronously go p.broadcastOwnership([]CartId{id}) - return p.LocalHostname, nil + return nil } // broadcastOwnership sends an AnnounceOwnership RPC to all healthy remotes. @@ -442,33 +467,25 @@ func (p *SyncedPool) broadcastOwnership(ids []CartId) { if len(ids) == 0 { return } - // Prepare payload (convert to string slice) - payload := make([]string, 0, len(ids)) + + uids := make([]uint64, 0, len(ids)) for _, id := range ids { - if id.String() != "" { - payload = append(payload, id.String()) - } - } - if len(payload) == 0 { - return + uids = append(uids, uint64(id)) } p.mu.RLock() - remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) + defer p.mu.RUnlock() + for _, r := range p.remoteHosts { if r.IsHealthy() { - remotes = append(remotes, r) + go func(rh *RemoteHostGRPC) { + rh.ControlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ + Host: p.LocalHostname, + CartIds: uids, + }) + }(r) } } - p.mu.RUnlock() - - for _, r := range remotes { - go func(rh *RemoteHostGRPC) { - // AnnounceOwnership RPC not yet available (proto regeneration pending); no-op broadcast for now. - // Intended announcement: host=p.LocalHostname ids=payload - _ = rh - }(r) - } } // AdoptRemoteOwnership processes an incoming ownership announcement for cart ids. @@ -476,6 +493,10 @@ func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) { if host == "" || host == p.LocalHostname { return } + remoteHost, ok := p.remoteHosts[host] + if !ok { + log.Printf("remote host does not exist!!") + } p.mu.Lock() defer p.mu.Unlock() for _, s := range ids { @@ -488,7 +509,7 @@ func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) { } id := parsed // Do not overwrite if already claimed by another host (first wins). - if existing, ok := p.remoteOwners[id]; ok && existing != host { + if existing, ok := p.remoteOwners[id]; ok && existing != remoteHost { continue } // Skip if we own locally (local wins for our own process) @@ -498,7 +519,7 @@ func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) { if localHas { continue } - p.remoteOwners[id] = host + p.remoteOwners[id] = remoteHost } } @@ -506,20 +527,13 @@ func (p *SyncedPool) AdoptRemoteOwnership(host string, ids []string) { // the first-touch model. If another host owns the cart, ErrNotOwner is returned. // Remote grain proxy logic and ring-based spawning have been removed. func (p *SyncedPool) getGrain(id CartId) (Grain, error) { - owner, err := p.resolveOwnerFirstTouch(id) - if err != nil { - return nil, err - } - if owner != p.LocalHostname { - // Another host owns it; signal caller to proxy / forward. - return nil, ErrNotOwner - } // Owner is local (either existing or just claimed), fetch/create grain. grain, err := p.local.GetGrain(id) if err != nil { return nil, err } + p.resolveOwnerFirstTouch(id) return grain, nil } @@ -528,27 +542,31 @@ func (p *SyncedPool) getGrain(id CartId) (Grain, error) { // to replica owners (best-effort) and reconcile quorum on read. func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { grain, err := p.getGrain(id) - if err == ErrNotOwner { - // Remote owner reported but either unreachable or failed earlier in stack. - // Takeover strategy: remove remote mapping (first-touch override) and claim locally. - p.mu.Lock() - delete(p.remoteOwners, id) - p.mu.Unlock() - if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil { - return nil, terr - } else if owner == p.LocalHostname { - // Fetch (now-local) grain - grain, err = p.local.GetGrain(id) - if err != nil { - return nil, err - } - } else { - // Another host reclaimed before us; treat as not owner. - return nil, ErrNotOwner - } - } else if err != nil { + if err != nil { + log.Printf("could not get grain %v", err) return nil, err } + // if err == ErrNotOwner { + // // Remote owner reported but either unreachable or failed earlier in stack. + // // Takeover strategy: remove remote mapping (first-touch override) and claim locally. + // p.mu.Lock() + // delete(p.remoteOwners, id) + // p.mu.Unlock() + // if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil { + // return nil, terr + // } else if owner == p.LocalHostname { + // // Fetch (now-local) grain + // grain, err = p.local.GetGrain(id) + // if err != nil { + // return nil, err + // } + // } else { + // // Another host reclaimed before us; treat as not owner. + // return nil, ErrNotOwner + // } + // } else if err != nil { + // return nil, err + // } start := time.Now() result, applyErr := grain.Apply(mutation, false) @@ -569,10 +587,10 @@ func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) if applyErr == nil && result != nil { cartMutationsTotal.Inc() - if p.ownerHostFor(id) == p.LocalHostname { - // Update active grains gauge only for local ownership - cartActiveGrains.Set(float64(p.local.DebugGrainCount())) - } + //if p.ownerHostFor(id) == p.LocalHostname { + // Update active grains gauge only for local ownership + cartActiveGrains.Set(float64(p.local.DebugGrainCount())) + //} } else if applyErr != nil { cartMutationFailuresTotal.Inc() } @@ -583,22 +601,8 @@ func (p *SyncedPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) // Future replication hook: Read-repair or quorum read can be added here. func (p *SyncedPool) Get(id CartId) (*CartGrain, error) { grain, err := p.getGrain(id) - if err == ErrNotOwner { - // Attempt takeover on read as well (e.g. owner dead). - p.mu.Lock() - delete(p.remoteOwners, id) - p.mu.Unlock() - if owner, terr := p.resolveOwnerFirstTouch(id); terr != nil { - return nil, terr - } else if owner == p.LocalHostname { - grain, err = p.local.GetGrain(id) - if err != nil { - return nil, err - } - } else { - return nil, ErrNotOwner - } - } else if err != nil { + if err != nil { + log.Printf("could not get grain %v", err) return nil, err } return grain.GetCurrentState() @@ -631,6 +635,7 @@ func (p *SyncedPool) Hostname() string { } // OwnerHost returns the primary owning host for a given cart id (ring lookup). -func (p *SyncedPool) OwnerHost(id CartId) string { - return p.ownerHostFor(id) +func (p *SyncedPool) OwnerHost(id CartId) (Host, bool) { + ownerHost, ok := p.remoteOwners[id] + return ownerHost, ok } diff --git a/tcp-connection_test.go b/tcp-connection_test.go deleted file mode 100644 index cbe37dd..0000000 --- a/tcp-connection_test.go +++ /dev/null @@ -1,8 +0,0 @@ -/* -Legacy TCP networking (GenericListener / Frame protocol) has been removed -as part of the gRPC migration. This file intentionally contains no tests. - -Keeping an empty Go file (with a package declaration) ensures the old -tcp-connection test target no longer runs without causing build issues. -*/ -package main