Complete refactor to new grpc control plane and only http proxy for carts #4

Merged
mats merged 75 commits from refactor/http-proxy into main 2025-10-14 22:31:28 +02:00
29 changed files with 584 additions and 654 deletions
Showing only changes of commit 1575b3a829 - Show all commits

BIN
cart Executable file

Binary file not shown.

View File

@@ -1,543 +0,0 @@
package main
import (
"fmt"
"log"
"maps"
"reflect"
"sync"
"time"
"git.tornberg.me/go-cart-actor/pkg/actor"
"git.tornberg.me/go-cart-actor/pkg/discovery"
"git.tornberg.me/go-cart-actor/pkg/proxy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/watch"
)
// ---------------------------------------------------------------------------
// Metrics shared by the cart pool implementation.
// ---------------------------------------------------------------------------
var (
poolGrains = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_grains_in_pool",
Help: "The total number of grains in the local pool",
})
poolSize = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_pool_size",
Help: "Configured capacity of the cart pool",
})
poolUsage = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_grain_pool_usage",
Help: "Current utilisation of the cart pool",
})
negotiationCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_negotiation_total",
Help: "The total number of remote host negotiations",
})
connectedRemotes = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_connected_remotes",
Help: "Number of connected remote hosts",
})
cartMutationsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_mutations_total",
Help: "Total number of cart state mutations applied",
})
cartMutationFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_mutation_failures_total",
Help: "Total number of failed cart state mutations",
})
cartMutationLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "cart_mutation_latency_seconds",
Help: "Latency of cart mutations in seconds",
Buckets: prometheus.DefBuckets,
}, []string{"mutation"})
)
// GrainPool is the interface exposed to HTTP handlers and other subsystems.
// CartPool merges the responsibilities that previously belonged to
// GrainLocalPool and SyncedPool. It provides local grain storage together
// with cluster coordination, ownership negotiation and expiry signalling.
type CartPool struct {
// Local grain state -----------------------------------------------------
localMu sync.RWMutex
grains map[uint64]*CartGrain
spawn func(id CartId) (*CartGrain, error)
ttl time.Duration
poolSize int
// Cluster coordination --------------------------------------------------
hostname string
remoteMu sync.RWMutex
remoteOwners map[uint64]*proxy.RemoteHost
remoteHosts map[string]*proxy.RemoteHost
//discardedHostHandler *DiscardedHostHandler
// House-keeping ---------------------------------------------------------
purgeTicker *time.Ticker
}
// NewCartPool constructs a unified pool. Discovery may be nil for standalone
// deployments.
func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id CartId) (*CartGrain, error), hostWatch discovery.Discovery) (*CartPool, error) {
p := &CartPool{
grains: make(map[uint64]*CartGrain),
spawn: spawn,
ttl: ttl,
poolSize: size,
hostname: hostname,
remoteOwners: make(map[uint64]*proxy.RemoteHost),
remoteHosts: make(map[string]*proxy.RemoteHost),
}
p.purgeTicker = time.NewTicker(time.Minute)
go func() {
for range p.purgeTicker.C {
p.purge()
}
}()
if hostWatch != nil {
go p.startDiscovery(hostWatch)
} else {
log.Printf("No discovery configured; expecting manual AddRemote or static host injection")
}
return p, nil
}
func (p *CartPool) purge() {
purgeLimit := time.Now().Add(-p.ttl)
purgedIds := make([]uint64, 0, len(p.grains))
p.localMu.Lock()
for id, grain := range p.grains {
if grain.GetLastAccess().Before(purgeLimit) {
purgedIds = append(purgedIds, id)
delete(p.grains, id)
}
}
p.localMu.Unlock()
p.forAllHosts(func(remote *proxy.RemoteHost) {
remote.AnnounceExpiry(purgedIds)
})
}
// startDiscovery subscribes to cluster events and adds/removes hosts.
func (p *CartPool) startDiscovery(discovery discovery.Discovery) {
time.Sleep(3 * time.Second) // allow gRPC server startup
log.Printf("Starting discovery watcher")
ch, err := discovery.Watch()
if err != nil {
log.Printf("Discovery error: %v", err)
return
}
for evt := range ch {
if evt.Host == "" {
continue
}
switch evt.Type {
case watch.Deleted:
if p.IsKnown(evt.Host) {
p.RemoveHost(evt.Host)
}
default:
if !p.IsKnown(evt.Host) {
log.Printf("Discovered host %s", evt.Host)
p.AddRemote(evt.Host)
}
}
}
}
// ---------------------------------------------------------------------------
// Local grain management
// ---------------------------------------------------------------------------
func (p *CartPool) statsUpdate() {
p.localMu.RLock()
size := len(p.grains)
cap := p.poolSize
p.localMu.RUnlock()
poolGrains.Set(float64(size))
poolSize.Set(float64(cap))
if cap > 0 {
poolUsage.Set(float64(size) / float64(cap))
}
}
// LocalUsage returns the number of resident grains and configured capacity.
func (p *CartPool) LocalUsage() (int, int) {
p.localMu.RLock()
defer p.localMu.RUnlock()
return len(p.grains), p.poolSize
}
// LocalCartIDs returns the currently owned cart ids (for control-plane RPCs).
func (p *CartPool) GetLocalIds() []uint64 {
p.localMu.RLock()
defer p.localMu.RUnlock()
ids := make([]uint64, 0, len(p.grains))
for _, g := range p.grains {
if g == nil {
continue
}
ids = append(ids, uint64(g.GetId()))
}
return ids
}
func (p *CartPool) HandleRemoteExpiry(host string, ids []uint64) error {
p.remoteMu.Lock()
defer p.remoteMu.Unlock()
for _, id := range ids {
delete(p.remoteOwners, id)
}
return nil
}
func (p *CartPool) HandleOwnershipChange(host string, ids []uint64) error {
p.remoteMu.RLock()
remoteHost, exists := p.remoteHosts[host]
p.remoteMu.RUnlock()
if !exists {
createdHost, err := p.AddRemote(host)
if err != nil {
return err
}
remoteHost = createdHost
}
p.remoteMu.Lock()
defer p.remoteMu.Unlock()
p.localMu.Lock()
defer p.localMu.Unlock()
for _, id := range ids {
log.Printf("Handling ownership change for cart %d to host %s", id, host)
delete(p.grains, id)
p.remoteOwners[id] = remoteHost
}
return nil
}
// SnapshotGrains returns a copy of the currently resident grains keyed by id.
func (p *CartPool) SnapshotGrains() map[uint64]*CartGrain {
p.localMu.RLock()
defer p.localMu.RUnlock()
out := maps.Clone(p.grains)
return out
}
// func (p *CartPool) getLocalGrain(key uint64) (*CartGrain, error) {
// grainLookups.Inc()
// p.localMu.RLock()
// grain, ok := p.grains[key]
// p.localMu.RUnlock()
// if grain != nil && ok {
// return grain, nil
// }
// go p.statsUpdate()
// return grain, nil
// }
// ---------------------------------------------------------------------------
// Cluster ownership and coordination
// ---------------------------------------------------------------------------
func (p *CartPool) TakeOwnership(id uint64) {
if p.grains[id] != nil {
return
}
log.Printf("taking ownership of: %d", id)
p.broadcastOwnership([]uint64{id})
}
func (p *CartPool) AddRemote(host string) (*proxy.RemoteHost, error) {
if host == "" || host == p.hostname || p.IsKnown(host) {
return nil, fmt.Errorf("invalid host")
}
remote, err := proxy.NewRemoteHost(host)
if err != nil {
log.Printf("AddRemote: NewRemoteHostGRPC %s failed: %v", host, err)
return nil, err
}
p.remoteMu.Lock()
p.remoteHosts[host] = remote
p.remoteMu.Unlock()
connectedRemotes.Set(float64(p.RemoteCount()))
log.Printf("Connected to remote host %s", host)
go p.pingLoop(remote)
go p.initializeRemote(remote)
go p.SendNegotiation()
return remote, nil
}
func (p *CartPool) initializeRemote(remote *proxy.RemoteHost) {
remotesIds := remote.GetActorIds()
p.remoteMu.Lock()
for _, id := range remotesIds {
p.localMu.Lock()
delete(p.grains, id)
p.localMu.Unlock()
if _, exists := p.remoteOwners[id]; !exists {
p.remoteOwners[id] = remote
}
}
p.remoteMu.Unlock()
}
func (p *CartPool) RemoveHost(host string) {
p.remoteMu.Lock()
remote, exists := p.remoteHosts[host]
if exists {
go remote.Close()
delete(p.remoteHosts, host)
}
for id, owner := range p.remoteOwners {
if owner.Host == host {
delete(p.remoteOwners, id)
}
}
p.remoteMu.Unlock()
if exists {
remote.Close()
}
connectedRemotes.Set(float64(p.RemoteCount()))
}
func (p *CartPool) RemoteCount() int {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
return len(p.remoteHosts)
}
// RemoteHostNames returns a snapshot of connected remote host identifiers.
func (p *CartPool) RemoteHostNames() []string {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
hosts := make([]string, 0, len(p.remoteHosts))
for host := range p.remoteHosts {
hosts = append(hosts, host)
}
return hosts
}
func (p *CartPool) IsKnown(host string) bool {
if host == p.hostname {
return true
}
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
_, ok := p.remoteHosts[host]
return ok
}
func (p *CartPool) pingLoop(remote *proxy.RemoteHost) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if !remote.Ping() {
if !remote.IsHealthy() {
log.Printf("Remote %s unhealthy, removing", remote.Host)
p.Close()
p.RemoveHost(remote.Host)
return
}
continue
}
}
}
func (p *CartPool) IsHealthy() bool {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
for _, r := range p.remoteHosts {
if !r.IsHealthy() {
return false
}
}
return true
}
func (p *CartPool) Negotiate(otherHosts []string) {
for _, host := range otherHosts {
if host != p.hostname {
p.remoteMu.RLock()
_, ok := p.remoteHosts[host]
p.remoteMu.RUnlock()
if !ok {
go p.AddRemote(host)
}
}
}
}
func (p *CartPool) SendNegotiation() {
negotiationCount.Inc()
p.remoteMu.RLock()
hosts := make([]string, 0, len(p.remoteHosts)+1)
hosts = append(hosts, p.hostname)
remotes := make([]*proxy.RemoteHost, 0, len(p.remoteHosts))
for h, r := range p.remoteHosts {
hosts = append(hosts, h)
remotes = append(remotes, r)
}
p.remoteMu.RUnlock()
p.forAllHosts(func(remote *proxy.RemoteHost) {
knownByRemote, err := remote.Negotiate(hosts)
if err != nil {
log.Printf("Negotiate with %s failed: %v", remote.Host, err)
return
}
for _, h := range knownByRemote {
if !p.IsKnown(h) {
go p.AddRemote(h)
}
}
})
}
func (p *CartPool) forAllHosts(fn func(*proxy.RemoteHost)) {
p.remoteMu.RLock()
rh := maps.Clone(p.remoteHosts)
p.remoteMu.RUnlock()
wg := sync.WaitGroup{}
for _, host := range rh {
wg.Go(func() { fn(host) })
}
for name, host := range rh {
if !host.IsHealthy() {
host.Close()
p.remoteMu.Lock()
delete(p.remoteHosts, name)
p.remoteMu.Unlock()
}
}
}
func (p *CartPool) broadcastOwnership(ids []uint64) {
if len(ids) == 0 {
return
}
p.forAllHosts(func(rh *proxy.RemoteHost) {
rh.AnnounceOwnership(ids)
})
}
func (p *CartPool) getOrClaimGrain(id uint64) (*CartGrain, error) {
p.localMu.RLock()
grain, exists := p.grains[id]
p.localMu.RUnlock()
if exists && grain != nil {
return grain, nil
}
grain, err := p.spawn(CartId(id))
if err != nil {
return nil, err
}
p.localMu.Lock()
p.grains[id] = grain
p.localMu.Unlock()
go p.broadcastOwnership([]uint64{id})
return grain, nil
}
// ErrNotOwner is returned when a cart belongs to another host.
var ErrNotOwner = fmt.Errorf("not owner")
// Apply applies a mutation to a grain.
func (p *CartPool) Apply(id uint64, mutation any) (*CartGrain, error) {
grain, err := p.getOrClaimGrain(id)
if err != nil {
return nil, err
}
start := time.Now()
result, applyErr := grain.Apply(mutation, false)
mutationType := "unknown"
if mutation != nil {
if t := reflect.TypeOf(mutation); t != nil {
if t.Kind() == reflect.Pointer {
t = t.Elem()
}
if t.Name() != "" {
mutationType = t.Name()
}
}
}
cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds())
if applyErr == nil && result != nil {
cartMutationsTotal.Inc()
} else if applyErr != nil {
cartMutationFailuresTotal.Inc()
}
return result, applyErr
}
// Get returns the current state of a grain.
func (p *CartPool) Get(id uint64) (*CartGrain, error) {
grain, err := p.getOrClaimGrain(id)
if err != nil {
return nil, err
}
return grain.GetCurrentState()
}
// OwnerHost reports the remote owner (if any) for the supplied cart id.
func (p *CartPool) OwnerHost(id uint64) (actor.Host, bool) {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
owner, ok := p.remoteOwners[id]
return owner, ok
}
// Hostname returns the local hostname (pod IP).
func (p *CartPool) Hostname() string {
return p.hostname
}
// Close notifies remotes that this host is shutting down.
func (p *CartPool) Close() {
p.remoteMu.Lock()
defer p.remoteMu.Unlock()
for _, r := range p.remoteHosts {
go func(rh *proxy.RemoteHost) {
rh.Close()
}(r)
}
if p.purgeTicker != nil {
p.purgeTicker.Stop()
}
}

View File

@@ -76,8 +76,8 @@ type CartGrain struct {
PaymentStatus string `json:"paymentStatus,omitempty"`
}
func (c *CartGrain) GetId() CartId {
return c.Id
func (c *CartGrain) GetId() uint64 {
return uint64(c.Id)
}
func (c *CartGrain) GetLastChange() time.Time {
@@ -222,7 +222,6 @@ func GetTaxAmount(total int64, tax int) int64 {
}
func (c *CartGrain) Apply(content interface{}, isReplay bool) (*CartGrain, error) {
grainMutations.Inc()
updated, err := ApplyRegistered(c, content)
if err != nil {

View File

@@ -15,9 +15,11 @@ import (
"git.tornberg.me/go-cart-actor/pkg/actor"
"git.tornberg.me/go-cart-actor/pkg/discovery"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
"git.tornberg.me/go-cart-actor/pkg/proxy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
@@ -37,13 +39,13 @@ var (
})
)
func spawn(id CartId) (*CartGrain, error) {
func spawn(id uint64) (actor.Grain[CartGrain], error) {
grainSpawns.Inc()
ret := &CartGrain{
lastItemId: 0,
lastDeliveryId: 0,
Deliveries: []*CartDelivery{},
Id: id,
Id: CartId(id),
Items: []*CartItem{},
TotalPrice: 0,
}
@@ -53,7 +55,7 @@ func spawn(id CartId) (*CartGrain, error) {
// Legacy loadMessages (no-op) retained; then replay append-only event log
//_ = loadMessages(ret, id)
err := ReplayCartEvents(ret, id)
err := ReplayCartEvents(ret, CartId(id))
return ret, err
}
@@ -63,30 +65,13 @@ func init() {
}
type App struct {
pool *CartPool
pool *actor.SimpleGrainPool[CartGrain]
storage *DiskStorage
}
// func (a *App) Save() error {
// for id, grain := range a.pool.SnapshotGrains() {
// if grain == nil {
// continue
// }
// if grain.GetLastChange().After(a.storage.LastSaves[uint64(id)]) {
// err := a.storage.Store(id, grain)
// if err != nil {
// log.Printf("Error saving grain %s: %v\n", id, err)
// }
// }
// }
// return nil
// }
var podIp = os.Getenv("POD_IP")
var name = os.Getenv("POD_NAME")
var amqpUrl = os.Getenv("AMQP_URL")
var KlarnaInstance = NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
var tpl = `<!DOCTYPE html>
<html lang="en">
@@ -127,11 +112,15 @@ func GetDiscovery() discovery.Discovery {
}
func main() {
controlPlaneConfig := actor.DefaultServerConfig()
storage, err := NewDiskStorage(fmt.Sprintf("data/s_%s.gob", name))
if err != nil {
log.Printf("Error loading state: %v\n", err)
}
pool, err := NewCartPool(2*65535, 15*time.Minute, podIp, spawn, GetDiscovery())
pool, err := actor.NewSimpleGrainPool(2*65535, 15*time.Minute, podIp, spawn, func(host string) (actor.Host, error) {
return proxy.NewRemoteHost(host)
})
if err != nil {
log.Fatalf("Error creating cart pool: %v\n", err)
}
@@ -140,31 +129,52 @@ func main() {
storage: storage,
}
grpcSrv, err := actor.NewControlServer[*CartGrain](":1337", pool)
grpcSrv, err := actor.NewControlServer[*CartGrain](controlPlaneConfig, pool)
if err != nil {
log.Fatalf("Error starting control plane gRPC server: %v\n", err)
}
defer grpcSrv.GracefulStop()
// go func() {
// for range time.Tick(time.Minute * 5) {
// err := app.Save()
// if err != nil {
// log.Printf("Error saving: %v\n", err)
// }
// }
// }()
go func(hw discovery.Discovery) {
if hw == nil {
log.Print("No discovery service available")
return
}
ch, err := hw.Watch()
if err != nil {
log.Printf("Discovery error: %v", err)
return
}
for evt := range ch {
if evt.Host == "" {
continue
}
switch evt.Type {
case watch.Deleted:
if pool.IsKnown(evt.Host) {
pool.RemoveHost(evt.Host)
}
default:
if !pool.IsKnown(evt.Host) {
log.Printf("Discovered host %s", evt.Host)
pool.AddRemote(evt.Host)
}
}
}
}(GetDiscovery())
orderHandler := &AmqpOrderHandler{
Url: amqpUrl,
}
klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp))
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient)
mux := http.NewServeMux()
mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve()))
// only for local
// mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) {
// syncedPool.AddRemote(r.PathValue("host"))
// })
mux.HandleFunc("GET /add/remote/{host}", func(w http.ResponseWriter, r *http.Request) {
pool.AddRemote(r.PathValue("host"))
})
// mux.HandleFunc("GET /save", app.HandleSave)
//mux.HandleFunc("/", app.RewritePath)
mux.HandleFunc("/debug/pprof/", pprof.Index)
@@ -221,30 +231,42 @@ func main() {
return
}
cartId := parsed
order, err = syncedServer.CreateOrUpdateCheckout(r.Host, cartId)
syncedServer.ProxyHandler(func(w http.ResponseWriter, r *http.Request, cartId CartId) error {
order, err = syncedServer.CreateOrUpdateCheckout(r.Host, cartId)
if err != nil {
return err
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, tpl, order.HTMLSnippet)
return nil
})(cartId, w, r)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
}
// v2: Apply now returns *CartGrain; order creation handled inside grain (no payload to unmarshal)
} else {
order, err = KlarnaInstance.GetOrder(orderId)
order, err = klarnaClient.GetOrder(orderId)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, tpl, order.HTMLSnippet)
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, tpl, order.HTMLSnippet)
})
mux.HandleFunc("/confirmation/{order_id}", func(w http.ResponseWriter, r *http.Request) {
orderId := r.PathValue("order_id")
order, err := KlarnaInstance.GetOrder(orderId)
order, err := klarnaClient.GetOrder(orderId)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
@@ -304,7 +326,7 @@ func main() {
orderId := r.URL.Query().Get("order_id")
log.Printf("Order confirmation push: %s", orderId)
order, err := KlarnaInstance.GetOrder(orderId)
order, err := klarnaClient.GetOrder(orderId)
if err != nil {
log.Printf("Error creating request: %v\n", err)
@@ -325,7 +347,7 @@ func main() {
w.WriteHeader(http.StatusInternalServerError)
return
}
err = KlarnaInstance.AcknowledgeOrder(orderId)
err = klarnaClient.AcknowledgeOrder(orderId)
if err != nil {
log.Printf("Error acknowledging order: %v\n", err)
}

View File

@@ -9,18 +9,21 @@ import (
"strconv"
"time"
"git.tornberg.me/go-cart-actor/pkg/actor"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
)
type PoolServer struct {
pod_name string
pool *CartPool
pod_name string
pool actor.GrainPool[*CartGrain]
klarnaClient *KlarnaClient
}
func NewPoolServer(pool *CartPool, pod_name string) *PoolServer {
func NewPoolServer(pool actor.GrainPool[*CartGrain], pod_name string, klarnaClient *KlarnaClient) *PoolServer {
return &PoolServer{
pod_name: pod_name,
pool: pool,
pod_name: pod_name,
pool: pool,
klarnaClient: klarnaClient,
}
}
@@ -181,24 +184,24 @@ func (s *PoolServer) HandleAddRequest(w http.ResponseWriter, r *http.Request, id
return s.WriteResult(w, reply)
}
func (s *PoolServer) HandleConfirmation(w http.ResponseWriter, r *http.Request, id CartId) error {
orderId := r.PathValue("orderId")
if orderId == "" {
return fmt.Errorf("orderId is empty")
}
order, err := KlarnaInstance.GetOrder(orderId)
// func (s *PoolServer) HandleConfirmation(w http.ResponseWriter, r *http.Request, id CartId) error {
// orderId := r.PathValue("orderId")
// if orderId == "" {
// return fmt.Errorf("orderId is empty")
// }
// order, err := KlarnaInstance.GetOrder(orderId)
if err != nil {
return err
}
// if err != nil {
// return err
// }
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Pod-Name", s.pod_name)
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusOK)
return json.NewEncoder(w).Encode(order)
}
// w.Header().Set("Content-Type", "application/json")
// w.Header().Set("X-Pod-Name", s.pod_name)
// w.Header().Set("Cache-Control", "no-cache")
// w.Header().Set("Access-Control-Allow-Origin", "*")
// w.WriteHeader(http.StatusOK)
// return json.NewEncoder(w).Encode(order)
// }
func getCurrency(country string) string {
if country == "no" {
@@ -240,9 +243,9 @@ func (s *PoolServer) CreateOrUpdateCheckout(host string, id CartId) (*CheckoutOr
}
if grain.OrderReference != "" {
return KlarnaInstance.UpdateOrder(grain.OrderReference, bytes.NewReader(payload))
return s.klarnaClient.UpdateOrder(grain.OrderReference, bytes.NewReader(payload))
} else {
return KlarnaInstance.CreateOrder(bytes.NewReader(payload))
return s.klarnaClient.CreateOrder(bytes.NewReader(payload))
}
}
@@ -255,17 +258,19 @@ func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id CartId)
})
}
func (s *PoolServer) HandleCheckout(w http.ResponseWriter, r *http.Request, id CartId) error {
klarnaOrder, err := s.CreateOrUpdateCheckout(r.Host, id)
if err != nil {
return err
}
// func (s *PoolServer) HandleCheckout(w http.ResponseWriter, r *http.Request, id CartId) error {
// klarnaOrder, err := s.CreateOrUpdateCheckout(r.Host, id)
// if err != nil {
// return err
// }
s.ApplyCheckoutStarted(klarnaOrder, id)
// s.ApplyCheckoutStarted(klarnaOrder, id)
// w.Header().Set("Content-Type", "application/json")
// return json.NewEncoder(w).Encode(klarnaOrder)
// }
//
w.Header().Set("Content-Type", "application/json")
return json.NewEncoder(w).Encode(klarnaOrder)
}
func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
@@ -391,8 +396,8 @@ func (s *PoolServer) Serve() *http.ServeMux {
mux.HandleFunc("POST /delivery", CookieCartIdHandler(s.ProxyHandler(s.HandleSetDelivery)))
mux.HandleFunc("DELETE /delivery/{deliveryId}", CookieCartIdHandler(s.ProxyHandler(s.HandleRemoveDelivery)))
mux.HandleFunc("PUT /delivery/{deliveryId}/pickupPoint", CookieCartIdHandler(s.ProxyHandler(s.HandleSetPickupPoint)))
mux.HandleFunc("GET /checkout", CookieCartIdHandler(s.ProxyHandler(s.HandleCheckout)))
mux.HandleFunc("GET /confirmation/{orderId}", CookieCartIdHandler(s.ProxyHandler(s.HandleConfirmation)))
//mux.HandleFunc("GET /checkout", CookieCartIdHandler(s.ProxyHandler(s.HandleCheckout)))
//mux.HandleFunc("GET /confirmation/{orderId}", CookieCartIdHandler(s.ProxyHandler(s.HandleConfirmation)))
mux.HandleFunc("GET /byid/{id}", CartIdHandler(s.ProxyHandler(s.HandleGet)))
mux.HandleFunc("GET /byid/{id}/add/{sku}", CartIdHandler(s.ProxyHandler(s.HandleAddSku)))
@@ -402,8 +407,8 @@ func (s *PoolServer) Serve() *http.ServeMux {
mux.HandleFunc("POST /byid/{id}/delivery", CartIdHandler(s.ProxyHandler(s.HandleSetDelivery)))
mux.HandleFunc("DELETE /byid/{id}/delivery/{deliveryId}", CartIdHandler(s.ProxyHandler(s.HandleRemoveDelivery)))
mux.HandleFunc("PUT /byid/{id}/delivery/{deliveryId}/pickupPoint", CartIdHandler(s.ProxyHandler(s.HandleSetPickupPoint)))
mux.HandleFunc("GET /byid/{id}/checkout", CartIdHandler(s.ProxyHandler(s.HandleCheckout)))
mux.HandleFunc("GET /byid/{id}/confirmation", CartIdHandler(s.ProxyHandler(s.HandleConfirmation)))
//mux.HandleFunc("GET /byid/{id}/checkout", CartIdHandler(s.ProxyHandler(s.HandleCheckout)))
//mux.HandleFunc("GET /byid/{id}/confirmation", CartIdHandler(s.ProxyHandler(s.HandleConfirmation)))
return mux
}

View File

@@ -1,6 +1,8 @@
package actor
import "net/http"
import (
"net/http"
)
type GrainPool[V any] interface {
Apply(id uint64, mutation any) (V, error)
@@ -14,12 +16,19 @@ type GrainPool[V any] interface {
GetLocalIds() []uint64
RemoveHost(host string)
IsHealthy() bool
IsKnown(string) bool
Close()
}
// Host abstracts a remote node capable of proxying cart requests.
type Host interface {
AnnounceExpiry(ids []uint64)
Negotiate(otherHosts []string) ([]string, error)
Name() string
Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error)
GetActorIds() []uint64
Close() error
Ping() bool
IsHealthy() bool
AnnounceOwnership(ids []uint64)
}

View File

@@ -76,15 +76,31 @@ func (s *ControlServer[V]) Closing(ctx context.Context, req *messages.ClosingNot
}, nil
}
type ServerConfig struct {
Addr string
Options []grpc.ServerOption
}
func NewServerConfig(addr string, options ...grpc.ServerOption) ServerConfig {
return ServerConfig{
Addr: addr,
Options: options,
}
}
func DefaultServerConfig() ServerConfig {
return NewServerConfig(":1337")
}
// StartGRPCServer configures and starts the unified gRPC server on the given address.
// It registers both the CartActor and ControlPlane services.
func NewControlServer[V any](addr string, pool GrainPool[V]) (*grpc.Server, error) {
lis, err := net.Listen("tcp", addr)
func NewControlServer[V any](config ServerConfig, pool GrainPool[V]) (*grpc.Server, error) {
lis, err := net.Listen("tcp", config.Addr)
if err != nil {
return nil, fmt.Errorf("failed to listen: %w", err)
}
grpcServer := grpc.NewServer()
grpcServer := grpc.NewServer(config.Options...)
server := &ControlServer[V]{
pool: pool,
}
@@ -92,7 +108,7 @@ func NewControlServer[V any](addr string, pool GrainPool[V]) (*grpc.Server, erro
messages.RegisterControlPlaneServer(grpcServer, server)
reflection.Register(grpcServer)
log.Printf("gRPC server listening on %s", addr)
log.Printf("gRPC server listening on %s", config.Addr)
go func() {
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve gRPC: %v", err)

View File

@@ -0,0 +1,413 @@
package actor
import (
"fmt"
"log"
"maps"
"sync"
"time"
)
type SimpleGrainPool[V any] struct {
// fields and methods
localMu sync.RWMutex
grains map[uint64]Grain[V]
spawn func(id uint64) (Grain[V], error)
spawnHost func(host string) (Host, error)
ttl time.Duration
poolSize int
// Cluster coordination --------------------------------------------------
hostname string
remoteMu sync.RWMutex
remoteOwners map[uint64]Host
remoteHosts map[string]Host
//discardedHostHandler *DiscardedHostHandler
// House-keeping ---------------------------------------------------------
purgeTicker *time.Ticker
}
func NewSimpleGrainPool[V any](size int, ttl time.Duration, hostname string, spawn func(id uint64) (Grain[V], error), spawnHost func(host string) (Host, error)) (*SimpleGrainPool[V], error) {
p := &SimpleGrainPool[V]{
grains: make(map[uint64]Grain[V]),
spawn: spawn,
spawnHost: spawnHost,
ttl: ttl,
poolSize: size,
hostname: hostname,
remoteOwners: make(map[uint64]Host),
remoteHosts: make(map[string]Host),
}
p.purgeTicker = time.NewTicker(time.Minute)
go func() {
for range p.purgeTicker.C {
p.purge()
}
}()
return p, nil
}
func (p *SimpleGrainPool[V]) purge() {
purgeLimit := time.Now().Add(-p.ttl)
purgedIds := make([]uint64, 0, len(p.grains))
p.localMu.Lock()
for id, grain := range p.grains {
if grain.GetLastAccess().Before(purgeLimit) {
purgedIds = append(purgedIds, id)
delete(p.grains, id)
}
}
p.localMu.Unlock()
p.forAllHosts(func(remote Host) {
remote.AnnounceExpiry(purgedIds)
})
}
// LocalUsage returns the number of resident grains and configured capacity.
func (p *SimpleGrainPool[V]) LocalUsage() (int, int) {
p.localMu.RLock()
defer p.localMu.RUnlock()
return len(p.grains), p.poolSize
}
// LocalCartIDs returns the currently owned cart ids (for control-plane RPCs).
func (p *SimpleGrainPool[V]) GetLocalIds() []uint64 {
p.localMu.RLock()
defer p.localMu.RUnlock()
ids := make([]uint64, 0, len(p.grains))
for _, g := range p.grains {
if g == nil {
continue
}
ids = append(ids, uint64(g.GetId()))
}
return ids
}
func (p *SimpleGrainPool[V]) HandleRemoteExpiry(host string, ids []uint64) error {
p.remoteMu.Lock()
defer p.remoteMu.Unlock()
for _, id := range ids {
delete(p.remoteOwners, id)
}
return nil
}
func (p *SimpleGrainPool[V]) HandleOwnershipChange(host string, ids []uint64) error {
p.remoteMu.RLock()
remoteHost, exists := p.remoteHosts[host]
p.remoteMu.RUnlock()
if !exists {
createdHost, err := p.AddRemote(host)
if err != nil {
return err
}
remoteHost = createdHost
}
p.remoteMu.Lock()
defer p.remoteMu.Unlock()
p.localMu.Lock()
defer p.localMu.Unlock()
for _, id := range ids {
log.Printf("Handling ownership change for cart %d to host %s", id, host)
delete(p.grains, id)
p.remoteOwners[id] = remoteHost
}
return nil
}
// TakeOwnership takes ownership of a grain.
func (p *SimpleGrainPool[V]) TakeOwnership(id uint64) {
p.broadcastOwnership([]uint64{id})
}
func (p *SimpleGrainPool[V]) AddRemote(host string) (Host, error) {
if host == "" || host == p.hostname || p.IsKnown(host) {
return nil, fmt.Errorf("invalid host")
}
remote, err := p.spawnHost(host)
if err != nil {
log.Printf("AddRemote %s failed: %v", host, err)
return nil, err
}
p.remoteMu.Lock()
p.remoteHosts[host] = remote
p.remoteMu.Unlock()
// connectedRemotes.Set(float64(p.RemoteCount()))
log.Printf("Connected to remote host %s", host)
go p.pingLoop(remote)
go p.initializeRemote(remote)
go p.SendNegotiation()
return remote, nil
}
func (p *SimpleGrainPool[V]) initializeRemote(remote Host) {
remotesIds := remote.GetActorIds()
p.remoteMu.Lock()
for _, id := range remotesIds {
p.localMu.Lock()
delete(p.grains, id)
p.localMu.Unlock()
if _, exists := p.remoteOwners[id]; !exists {
p.remoteOwners[id] = remote
}
}
p.remoteMu.Unlock()
}
func (p *SimpleGrainPool[V]) RemoveHost(host string) {
p.remoteMu.Lock()
remote, exists := p.remoteHosts[host]
if exists {
go remote.Close()
delete(p.remoteHosts, host)
}
count := 0
for id, owner := range p.remoteOwners {
if owner.Name() == host {
count++
delete(p.remoteOwners, id)
}
}
log.Printf("Removing host %s, grains: %d", host, count)
p.remoteMu.Unlock()
if exists {
remote.Close()
}
// connectedRemotes.Set(float64(p.RemoteCount()))
}
func (p *SimpleGrainPool[V]) RemoteCount() int {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
return len(p.remoteHosts)
}
// RemoteHostNames returns a snapshot of connected remote host identifiers.
func (p *SimpleGrainPool[V]) RemoteHostNames() []string {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
hosts := make([]string, 0, len(p.remoteHosts))
for host := range p.remoteHosts {
hosts = append(hosts, host)
}
return hosts
}
func (p *SimpleGrainPool[V]) IsKnown(host string) bool {
if host == p.hostname {
return true
}
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
_, ok := p.remoteHosts[host]
return ok
}
func (p *SimpleGrainPool[V]) pingLoop(remote Host) {
remote.Ping()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if !remote.Ping() {
if !remote.IsHealthy() {
log.Printf("Remote %s unhealthy, removing", remote.Name())
p.Close()
p.RemoveHost(remote.Name())
return
}
continue
}
}
}
func (p *SimpleGrainPool[V]) IsHealthy() bool {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
for _, r := range p.remoteHosts {
if !r.IsHealthy() {
return false
}
}
return true
}
func (p *SimpleGrainPool[V]) Negotiate(otherHosts []string) {
for _, host := range otherHosts {
if host != p.hostname {
p.remoteMu.RLock()
_, ok := p.remoteHosts[host]
p.remoteMu.RUnlock()
if !ok {
go p.AddRemote(host)
}
}
}
}
func (p *SimpleGrainPool[V]) SendNegotiation() {
//negotiationCount.Inc()
p.remoteMu.RLock()
hosts := make([]string, 0, len(p.remoteHosts)+1)
hosts = append(hosts, p.hostname)
remotes := make([]Host, 0, len(p.remoteHosts))
for h, r := range p.remoteHosts {
hosts = append(hosts, h)
remotes = append(remotes, r)
}
p.remoteMu.RUnlock()
p.forAllHosts(func(remote Host) {
knownByRemote, err := remote.Negotiate(hosts)
if err != nil {
log.Printf("Negotiate with %s failed: %v", remote.Name(), err)
return
}
for _, h := range knownByRemote {
if !p.IsKnown(h) {
go p.AddRemote(h)
}
}
})
}
func (p *SimpleGrainPool[V]) forAllHosts(fn func(Host)) {
p.remoteMu.RLock()
rh := maps.Clone(p.remoteHosts)
p.remoteMu.RUnlock()
wg := sync.WaitGroup{}
for _, host := range rh {
wg.Go(func() { fn(host) })
}
for name, host := range rh {
if !host.IsHealthy() {
host.Close()
p.remoteMu.Lock()
delete(p.remoteHosts, name)
p.remoteMu.Unlock()
}
}
}
func (p *SimpleGrainPool[V]) broadcastOwnership(ids []uint64) {
if len(ids) == 0 {
return
}
p.forAllHosts(func(rh Host) {
rh.AnnounceOwnership(ids)
})
log.Printf("taking ownership of %d ids", len(ids))
// go p.statsUpdate()
}
func (p *SimpleGrainPool[V]) getOrClaimGrain(id uint64) (Grain[V], error) {
p.localMu.RLock()
grain, exists := p.grains[id]
p.localMu.RUnlock()
if exists && grain != nil {
return grain, nil
}
grain, err := p.spawn(id)
if err != nil {
return nil, err
}
p.localMu.Lock()
p.grains[id] = grain
p.localMu.Unlock()
go p.broadcastOwnership([]uint64{id})
return grain, nil
}
// ErrNotOwner is returned when a cart belongs to another host.
var ErrNotOwner = fmt.Errorf("not owner")
// Apply applies a mutation to a grain.
func (p *SimpleGrainPool[V]) Apply(id uint64, mutation any) (*V, error) {
grain, err := p.getOrClaimGrain(id)
if err != nil {
return nil, err
}
//start := time.Now()
result, applyErr := grain.Apply(mutation, false)
//mutationType := "unknown"
// if mutation != nil {
// if t := reflect.TypeOf(mutation); t != nil {
// if t.Kind() == reflect.Pointer {
// t = t.Elem()
// }
// if t.Name() != "" {
// mutationType = t.Name()
// }
// }
// }
// cartMutationLatencySeconds.WithLabelValues(mutationType).Observe(time.Since(start).Seconds())
// if applyErr == nil && result != nil {
// cartMutationsTotal.Inc()
// } else if applyErr != nil {
// cartMutationFailuresTotal.Inc()
// }
return result, applyErr
}
// Get returns the current state of a grain.
func (p *SimpleGrainPool[V]) Get(id uint64) (*V, error) {
grain, err := p.getOrClaimGrain(id)
if err != nil {
return nil, err
}
return grain.GetCurrentState()
}
// OwnerHost reports the remote owner (if any) for the supplied cart id.
func (p *SimpleGrainPool[V]) OwnerHost(id uint64) (Host, bool) {
p.remoteMu.RLock()
defer p.remoteMu.RUnlock()
owner, ok := p.remoteOwners[id]
return owner, ok
}
// Hostname returns the local hostname (pod IP).
func (p *SimpleGrainPool[V]) Hostname() string {
return p.hostname
}
// Close notifies remotes that this host is shutting down.
func (p *SimpleGrainPool[V]) Close() {
p.forAllHosts(func(rh Host) {
rh.Close()
})
if p.purgeTicker != nil {
p.purgeTicker.Stop()
}
}

View File

@@ -2,6 +2,7 @@ package proxy
import (
"context"
"errors"
"fmt"
"io"
"log"
@@ -37,24 +38,27 @@ func NewRemoteHost(host string) (*RemoteHost, error) {
}
controlClient := messages.NewControlPlaneClient(conn)
for retries := range 3 {
ctx, pingCancel := context.WithTimeout(context.Background(), time.Second)
_, pingErr := controlClient.Ping(ctx, &messages.Empty{})
pingCancel()
if pingErr == nil {
break
}
if retries == 2 {
log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr)
conn.Close()
return nil, pingErr
}
time.Sleep(500 * time.Millisecond)
}
// go func() {
// for retries := range 3 {
// ctx, pingCancel := context.WithTimeout(context.Background(), time.Second)
// _, pingErr := controlClient.Ping(ctx, &messages.Empty{})
// pingCancel()
// if pingErr == nil {
// break
// }
// if retries == 2 {
// log.Printf("AddRemote: ping %s failed after retries: %v", host, pingErr)
// conn.Close()
// p
// }
// time.Sleep(500 * time.Millisecond)
// }
// }()
transport := &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
DisableKeepAlives: false,
IdleConnTimeout: 120 * time.Second,
}
client := &http.Client{Transport: transport, Timeout: 10 * time.Second}
@@ -82,14 +86,19 @@ func (h *RemoteHost) Close() error {
}
func (h *RemoteHost) Ping() bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := h.controlClient.Ping(ctx, &messages.Empty{})
cancel()
if err != nil {
h.MissedPings++
log.Printf("Ping %s failed (%d) %v", h.Host, h.MissedPings, err)
return false
var err error = errors.ErrUnsupported
for err != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = h.controlClient.Ping(ctx, &messages.Empty{})
cancel()
if err != nil {
h.MissedPings++
log.Printf("Ping %s failed (%d) %v", h.Host, h.MissedPings, err)
}
if !h.IsHealthy() {
return false
}
time.Sleep(time.Millisecond * 200)
}
h.MissedPings = 0