From b8266d80f9331d497b5734b341c33eba1c381009 Mon Sep 17 00:00:00 2001 From: matst80 Date: Sun, 12 Oct 2025 21:36:00 +0200 Subject: [PATCH] more stuff --- Makefile | 4 +- grain-pool.go => cart-grain-pool.go | 471 ++++++------------ cart-grain.go | 30 +- discarded-host.go | 84 ---- disk-storage.go | 8 +- event_log.go | 4 +- grpc_server.go | 121 ----- main.go | 23 +- mutation_add_item.go | 2 +- mutation_add_request.go | 2 +- mutation_change_quantity.go | 2 +- mutation_initialize_checkout.go | 2 +- mutation_order_created.go | 2 +- mutation_remove_delivery.go | 2 +- mutation_remove_item.go | 2 +- mutation_set_cart_items.go | 2 +- mutation_set_delivery.go | 2 +- mutation_set_pickup_point.go | 2 +- pkg/actor/grain.go | 11 + pkg/actor/grain_pool.go | 25 + pkg/actor/grpc_server.go | 105 ++++ discovery.go => pkg/discovery/discovery.go | 7 +- .../discovery/discovery_mock.go | 2 +- .../discovery/discovery_test.go | 2 +- pkg/discovery/types.go | 6 + {proto => pkg/messages}/control_plane.pb.go | 165 +++--- .../messages}/control_plane_grpc.pb.go | 32 +- {proto => pkg/messages}/messages.pb.go | 2 +- remotehost.go => pkg/proxy/remotehost.go | 90 ++-- pool-server.go | 134 ++--- proto/control_plane.proto | 10 +- 31 files changed, 578 insertions(+), 778 deletions(-) rename grain-pool.go => cart-grain-pool.go (53%) delete mode 100644 discarded-host.go delete mode 100644 grpc_server.go create mode 100644 pkg/actor/grain.go create mode 100644 pkg/actor/grain_pool.go create mode 100644 pkg/actor/grpc_server.go rename discovery.go => pkg/discovery/discovery.go (93%) rename discovery_mock.go => pkg/discovery/discovery_mock.go (99%) rename discovery_test.go => pkg/discovery/discovery_test.go (98%) create mode 100644 pkg/discovery/types.go rename {proto => pkg/messages}/control_plane.pb.go (73%) rename {proto => pkg/messages}/control_plane_grpc.pb.go (90%) rename {proto => pkg/messages}/messages.pb.go (99%) rename remotehost.go => pkg/proxy/remotehost.go (64%) diff --git a/Makefile b/Makefile index b688d69..cc5b50b 100644 --- a/Makefile +++ b/Makefile @@ -69,8 +69,8 @@ check_tools: protogen: check_tools @echo "$(YELLOW)Generating protobuf code (outputs -> ./proto)...$(RESET)" $(PROTOC) -I $(PROTO_DIR) \ - --go_out=./proto --go_opt=paths=source_relative \ - --go-grpc_out=./proto --go-grpc_opt=paths=source_relative \ + --go_out=./pkg/messages --go_opt=paths=source_relative \ + --go-grpc_out=./pkg/messages --go-grpc_opt=paths=source_relative \ $(PROTOS) @echo "$(GREEN)Protobuf generation complete.$(RESET)" diff --git a/grain-pool.go b/cart-grain-pool.go similarity index 53% rename from grain-pool.go rename to cart-grain-pool.go index c5574e9..7426fa8 100644 --- a/grain-pool.go +++ b/cart-grain-pool.go @@ -1,15 +1,16 @@ package main import ( - "context" "fmt" "log" - "net/http" + "maps" "reflect" "sync" "time" - messages "git.tornberg.me/go-cart-actor/proto" + "git.tornberg.me/go-cart-actor/pkg/actor" + "git.tornberg.me/go-cart-actor/pkg/discovery" + "git.tornberg.me/go-cart-actor/pkg/proxy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "k8s.io/apimachinery/pkg/watch" @@ -53,43 +54,18 @@ var ( Help: "Latency of cart mutations in seconds", Buckets: prometheus.DefBuckets, }, []string{"mutation"}) - cartActiveGrains = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cart_active_grains", - Help: "Number of active (resident) local grains", - }) ) // GrainPool is the interface exposed to HTTP handlers and other subsystems. -type GrainPool interface { - Apply(id CartId, mutation interface{}) (*CartGrain, error) - Get(id CartId) (*CartGrain, error) - OwnerHost(id CartId) (Host, bool) - Hostname() string - TakeOwnership(id CartId) - IsHealthy() bool - Close() -} - -// Host abstracts a remote node capable of proxying cart requests. -type Host interface { - Name() string - Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) -} - -// Ttl tracks the expiry deadline for an in-memory grain. -type Ttl struct { - Expires time.Time - Grain *CartGrain -} // CartPool merges the responsibilities that previously belonged to // GrainLocalPool and SyncedPool. It provides local grain storage together // with cluster coordination, ownership negotiation and expiry signalling. type CartPool struct { // Local grain state ----------------------------------------------------- - localMu sync.RWMutex - grains map[uint64]*CartGrain - expiry []Ttl + localMu sync.RWMutex + grains map[uint64]*CartGrain + spawn func(id CartId) (*CartGrain, error) ttl time.Duration poolSize int @@ -97,8 +73,8 @@ type CartPool struct { // Cluster coordination -------------------------------------------------- hostname string remoteMu sync.RWMutex - remoteOwners map[CartId]*RemoteHostGRPC - remoteHosts map[string]*RemoteHostGRPC + remoteOwners map[uint64]*proxy.RemoteHost + remoteHosts map[string]*proxy.RemoteHost //discardedHostHandler *DiscardedHostHandler // House-keeping --------------------------------------------------------- @@ -107,30 +83,27 @@ type CartPool struct { // NewCartPool constructs a unified pool. Discovery may be nil for standalone // deployments. -func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id CartId) (*CartGrain, error), discovery Discovery) (*CartPool, error) { +func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id CartId) (*CartGrain, error), hostWatch discovery.Discovery) (*CartPool, error) { p := &CartPool{ - grains: make(map[uint64]*CartGrain), - expiry: make([]Ttl, 0), + grains: make(map[uint64]*CartGrain), + spawn: spawn, ttl: ttl, poolSize: size, hostname: hostname, - remoteOwners: make(map[CartId]*RemoteHostGRPC), - remoteHosts: make(map[string]*RemoteHostGRPC), + remoteOwners: make(map[uint64]*proxy.RemoteHost), + remoteHosts: make(map[string]*proxy.RemoteHost), } - // p.discardedHostHandler = NewDiscardedHostHandler(1338) - // p.discardedHostHandler.SetReconnectHandler(p.AddRemote) - p.purgeTicker = time.NewTicker(time.Minute) go func() { for range p.purgeTicker.C { - p.Purge() + p.purge() } }() - if discovery != nil { - go p.startDiscovery(discovery) + if hostWatch != nil { + go p.startDiscovery(hostWatch) } else { log.Printf("No discovery configured; expecting manual AddRemote or static host injection") } @@ -138,8 +111,26 @@ func NewCartPool(size int, ttl time.Duration, hostname string, spawn func(id Car return p, nil } +func (p *CartPool) purge() { + purgeLimit := time.Now().Add(-p.ttl) + purgedIds := make([]uint64, 0, len(p.grains)) + p.localMu.Lock() + for id, grain := range p.grains { + if grain.GetLastAccess().Before(purgeLimit) { + purgedIds = append(purgedIds, id) + + delete(p.grains, id) + } + } + p.localMu.Unlock() + p.forAllHosts(func(remote *proxy.RemoteHost) { + remote.AnnounceExpiry(purgedIds) + }) + +} + // startDiscovery subscribes to cluster events and adds/removes hosts. -func (p *CartPool) startDiscovery(discovery Discovery) { +func (p *CartPool) startDiscovery(discovery discovery.Discovery) { time.Sleep(3 * time.Second) // allow gRPC server startup log.Printf("Starting discovery watcher") ch, err := discovery.Watch() @@ -188,84 +179,8 @@ func (p *CartPool) LocalUsage() (int, int) { return len(p.grains), p.poolSize } -// SetAvailable pre-populates placeholder entries. -func (p *CartPool) SetAvailable(availableWithLastChangeUnix map[CartId]int64) { - p.localMu.Lock() - defer p.localMu.Unlock() - for id := range availableWithLastChangeUnix { - k := uint64(id) - if _, ok := p.grains[k]; !ok { - p.grains[k] = nil - p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl)}) - } - } - p.statsUpdate() -} - -// Purge removes expired grains and broadcasts expiry announcements so that -// other hosts drop stale ownership hints. -func (p *CartPool) Purge() { - now := time.Now() - keepChanged := now.Add(-p.ttl).Unix() - var expired []CartId - - p.localMu.Lock() - for i := 0; i < len(p.expiry); { - entry := p.expiry[i] - if entry.Grain == nil { - i++ - continue - } - if entry.Expires.After(now) { - break - } - if entry.Grain.GetLastChange() > keepChanged { - // Recently mutated: move to back. - p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) - p.expiry = append(p.expiry, entry) - continue - } - id := entry.Grain.GetId() - delete(p.grains, uint64(id)) - expired = append(expired, id) - if i < len(p.expiry)-1 { - p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) - } else { - p.expiry = p.expiry[:i] - } - } - p.localMu.Unlock() - - if len(expired) > 0 { - p.statsUpdate() - go p.broadcastExpiry(expired) - } -} - -// RefreshExpiry updates the TTL entry for a given grain. -func (p *CartPool) RefreshExpiry(id CartId) { - p.localMu.Lock() - defer p.localMu.Unlock() - for i := range p.expiry { - g := p.expiry[i].Grain - if g != nil && g.Id == id { - p.expiry[i].Expires = time.Now().Add(p.ttl) - return - } - } - // If no entry existed, append one (safeguard for newly spawned grains). - p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl), Grain: p.grains[uint64(id)]}) -} - -// DebugGrainCount returns the number of locally resident grains. -func (p *CartPool) DebugGrainCount() int { - p.localMu.RLock() - defer p.localMu.RUnlock() - return len(p.grains) -} - // LocalCartIDs returns the currently owned cart ids (for control-plane RPCs). -func (p *CartPool) LocalCartIDs() []uint64 { +func (p *CartPool) GetLocalIds() []uint64 { p.localMu.RLock() defer p.localMu.RUnlock() ids := make([]uint64, 0, len(p.grains)) @@ -278,6 +193,38 @@ func (p *CartPool) LocalCartIDs() []uint64 { return ids } +func (p *CartPool) HandleRemoteExpiry(host string, ids []uint64) error { + p.remoteMu.Lock() + defer p.remoteMu.Unlock() + for _, id := range ids { + delete(p.remoteOwners, id) + } + return nil +} + +func (p *CartPool) HandleOwnershipChange(host string, ids []uint64) error { + + p.remoteMu.RLock() + remoteHost, exists := p.remoteHosts[host] + p.remoteMu.RUnlock() + if !exists { + createdHost, err := p.AddRemote(host) + if err != nil { + return err + } + remoteHost = createdHost + } + p.remoteMu.Lock() + defer p.remoteMu.Unlock() + p.localMu.Lock() + defer p.localMu.Unlock() + for _, id := range ids { + delete(p.grains, id) + p.remoteOwners[id] = remoteHost + } + return nil +} + // SnapshotGrains returns a copy of the currently resident grains keyed by id. func (p *CartPool) SnapshotGrains() map[CartId]*CartGrain { p.localMu.RLock() @@ -291,82 +238,40 @@ func (p *CartPool) SnapshotGrains() map[CartId]*CartGrain { return out } -func (p *CartPool) removeLocalGrain(id CartId) { - p.localMu.Lock() - delete(p.grains, uint64(id)) - for i := range p.expiry { - if p.expiry[i].Grain != nil && p.expiry[i].Grain.GetId() == id { - p.expiry = append(p.expiry[:i], p.expiry[i+1:]...) - break - } - } - p.localMu.Unlock() - p.statsUpdate() -} +// func (p *CartPool) getLocalGrain(key uint64) (*CartGrain, error) { -func (p *CartPool) getLocalGrain(id CartId) (*CartGrain, error) { - key := uint64(id) - grainLookups.Inc() +// grainLookups.Inc() - p.localMu.RLock() - grain, ok := p.grains[key] - p.localMu.RUnlock() - if grain != nil && ok { - return grain, nil - } +// p.localMu.RLock() +// grain, ok := p.grains[key] +// p.localMu.RUnlock() +// if grain != nil && ok { +// return grain, nil +// } - p.localMu.Lock() - defer p.localMu.Unlock() - grain, ok = p.grains[key] - if grain == nil || !ok { - if len(p.grains) >= p.poolSize && len(p.expiry) > 0 { - if p.expiry[0].Expires.Before(time.Now()) && p.expiry[0].Grain != nil { - oldID := p.expiry[0].Grain.GetId() - delete(p.grains, uint64(oldID)) - p.expiry = p.expiry[1:] - go p.broadcastExpiry([]CartId{oldID}) - } else { - return nil, fmt.Errorf("pool is full") - } - } - spawned, err := p.spawn(id) - if err != nil { - return nil, err - } - p.grains[key] = spawned - p.expiry = append(p.expiry, Ttl{Expires: time.Now().Add(p.ttl), Grain: spawned}) - grain = spawned - } - go p.statsUpdate() - return grain, nil -} +// go p.statsUpdate() +// return grain, nil +// } // --------------------------------------------------------------------------- // Cluster ownership and coordination // --------------------------------------------------------------------------- -func (p *CartPool) TakeOwnership(id CartId) { - p.broadcastOwnership([]CartId{id}) +func (p *CartPool) TakeOwnership(id uint64) { + p.broadcastOwnership([]uint64{id}) } -func (p *CartPool) AddRemote(host string) (*RemoteHostGRPC, error) { - if host == "" || host == p.hostname { +func (p *CartPool) AddRemote(host string) (*proxy.RemoteHost, error) { + if host == "" || host == p.hostname || p.IsKnown(host) { return nil, fmt.Errorf("invalid host") } - p.remoteMu.Lock() - if _, exists := p.remoteHosts[host]; exists { - p.remoteMu.Unlock() - return nil, fmt.Errorf("host already exists") - } - p.remoteMu.Unlock() - - remote, err := NewRemoteHostGRPC(host) + remote, err := proxy.NewRemoteHost(host) if err != nil { log.Printf("AddRemote: NewRemoteHostGRPC %s failed: %v", host, err) + return nil, err } - p.remoteMu.Lock() p.remoteHosts[host] = remote p.remoteMu.Unlock() @@ -375,29 +280,25 @@ func (p *CartPool) AddRemote(host string) (*RemoteHostGRPC, error) { log.Printf("Connected to remote host %s", host) go p.pingLoop(remote) go p.initializeRemote(remote) - go p.Negotiate() + go p.SendNegotiation() return remote, nil } -func (p *CartPool) initializeRemote(remote *RemoteHostGRPC) { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - reply, err := remote.ControlClient.GetCartIds(ctx, &messages.Empty{}) - if err != nil { - log.Printf("Init remote %s: GetCartIds error: %v", remote.Host, err) - return - } - count := 0 +func (p *CartPool) initializeRemote(remote *proxy.RemoteHost) { + + remotesIds := remote.GetActorIds() + p.remoteMu.Lock() - for _, cid := range reply.CartIds { - id := CartId(cid) + for _, id := range remotesIds { + p.localMu.Lock() + delete(p.grains, id) + p.localMu.Unlock() if _, exists := p.remoteOwners[id]; !exists { p.remoteOwners[id] = remote } - count++ } p.remoteMu.Unlock() - log.Printf("Remote %s reported %d remote-owned carts", remote.Host, count) + } func (p *CartPool) RemoveHost(host string) { @@ -415,7 +316,7 @@ func (p *CartPool) RemoveHost(host string) { p.remoteMu.Unlock() if exists { - remote.Conn.Close() + remote.Close() } connectedRemotes.Set(float64(p.RemoteCount())) } @@ -447,7 +348,7 @@ func (p *CartPool) IsKnown(host string) bool { return ok } -func (p *CartPool) pingLoop(remote *RemoteHostGRPC) { +func (p *CartPool) pingLoop(remote *proxy.RemoteHost) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for range ticker.C { @@ -474,13 +375,27 @@ func (p *CartPool) IsHealthy() bool { return true } -func (p *CartPool) Negotiate() { +func (p *CartPool) Negotiate(otherHosts []string) { + + for _, host := range otherHosts { + if host != p.hostname { + p.remoteMu.RLock() + _, ok := p.remoteHosts[host] + p.remoteMu.RUnlock() + if !ok { + go p.AddRemote(host) + } + } + } +} + +func (p *CartPool) SendNegotiation() { negotiationCount.Inc() p.remoteMu.RLock() hosts := make([]string, 0, len(p.remoteHosts)+1) hosts = append(hosts, p.hostname) - remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) + remotes := make([]*proxy.RemoteHost, 0, len(p.remoteHosts)) for h, r := range p.remoteHosts { hosts = append(hosts, h) remotes = append(remotes, r) @@ -502,130 +417,53 @@ func (p *CartPool) Negotiate() { } } -func (p *CartPool) broadcastOwnership(ids []CartId) { +func (p *CartPool) forAllHosts(fn func(*proxy.RemoteHost)) { + p.remoteMu.RLock() + rh := maps.Clone(p.remoteHosts) + p.remoteMu.RUnlock() + + wg := sync.WaitGroup{} + for _, host := range rh { + + wg.Go(func() { fn(host) }) + + } + + for name, host := range rh { + if !host.IsHealthy() { + host.Close() + p.remoteMu.Lock() + delete(p.remoteHosts, name) + p.remoteMu.Unlock() + } + + } +} + +func (p *CartPool) broadcastOwnership(ids []uint64) { if len(ids) == 0 { return } - uids := make([]uint64, len(ids)) - for i, id := range ids { - uids[i] = uint64(id) - } - p.remoteMu.RLock() - remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) - for _, r := range p.remoteHosts { - if r.IsHealthy() { - remotes = append(remotes, r) - } else { - log.Printf("Skipping announce to unhealthy remote %s", r.Host) - p.RemoveHost(r.Host) - } - } - p.remoteMu.RUnlock() + p.forAllHosts(func(rh *proxy.RemoteHost) { + rh.AnnounceOwnership(ids) + }) - for _, remote := range remotes { - go func(rh *RemoteHostGRPC) { - rh.AnnounceOwnership(uids) - }(remote) - } } -func (p *CartPool) broadcastExpiry(ids []CartId) { - if len(ids) == 0 { - return - } - uids := make([]uint64, len(ids)) - for i, id := range ids { - uids[i] = uint64(id) - } - - p.remoteMu.RLock() - remotes := make([]*RemoteHostGRPC, 0, len(p.remoteHosts)) - for _, r := range p.remoteHosts { - if r.IsHealthy() { - remotes = append(remotes, r) - } - } - p.remoteMu.RUnlock() - - for _, remote := range remotes { - go func(rh *RemoteHostGRPC) { - rh.AnnounceExpiry(uids) - }(remote) - } -} - -func (p *CartPool) AdoptRemoteOwnership(host string, ids []string) { - if host == "" || host == p.hostname { - return - } - remoteHost, ok := p.remoteHosts[host] - if !ok { - log.Printf("AdoptRemoteOwnership: unknown host %s", host) - createdHost, err := p.AddRemote(host) - if err != nil { - log.Printf("AdoptRemoteOwnership: failed to add remote %s: %v", host, err) - return - } - remoteHost = createdHost - } - p.remoteMu.Lock() - defer p.remoteMu.Unlock() - for _, s := range ids { - if s == "" { - continue - } - parsed, ok := ParseCartId(s) - if !ok { - continue - } - if existing, ok := p.remoteOwners[parsed]; ok && existing != remoteHost { - continue - } - p.localMu.RLock() - _, localHas := p.grains[uint64(parsed)] - p.localMu.RUnlock() - if localHas { - continue - } - p.remoteOwners[parsed] = remoteHost - } -} - -func (p *CartPool) HandleRemoteExpiry(host string, ids []uint64) { - if host == "" || host == p.hostname { - return - } - p.remoteMu.Lock() - defer p.remoteMu.Unlock() - for _, raw := range ids { - id := CartId(raw) - if owner, ok := p.remoteOwners[id]; ok && owner.Host == host { - delete(p.remoteOwners, id) - } - } -} - -func (p *CartPool) getOrClaimGrain(id CartId) (*CartGrain, error) { +func (p *CartPool) getOrClaimGrain(id uint64) (*CartGrain, error) { p.localMu.RLock() - grain, exists := p.grains[uint64(id)] + grain, exists := p.grains[id] p.localMu.RUnlock() if exists && grain != nil { return grain, nil } - p.remoteMu.RLock() - remoteHost, found := p.remoteOwners[id] - p.remoteMu.RUnlock() - if found && remoteHost != nil && remoteHost.Host != "" { - return nil, ErrNotOwner - } - - grain, err := p.getLocalGrain(id) + grain, err := p.spawn(CartId(id)) if err != nil { return nil, err } - go p.broadcastOwnership([]CartId{id}) + go p.broadcastOwnership([]uint64{id}) return grain, nil } @@ -633,7 +471,7 @@ func (p *CartPool) getOrClaimGrain(id CartId) (*CartGrain, error) { var ErrNotOwner = fmt.Errorf("not owner") // Apply applies a mutation to a grain. -func (p *CartPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { +func (p *CartPool) Apply(id uint64, mutation any) (*CartGrain, error) { grain, err := p.getOrClaimGrain(id) if err != nil { return nil, err @@ -643,7 +481,7 @@ func (p *CartPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { mutationType := "unknown" if mutation != nil { if t := reflect.TypeOf(mutation); t != nil { - if t.Kind() == reflect.Ptr { + if t.Kind() == reflect.Pointer { t = t.Elem() } if t.Name() != "" { @@ -655,8 +493,8 @@ func (p *CartPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { if applyErr == nil && result != nil { cartMutationsTotal.Inc() - p.RefreshExpiry(id) - cartActiveGrains.Set(float64(p.DebugGrainCount())) + //p.RefreshExpiry(id) + //cartActiveGrains.Set(float64(len(p.grains))) } else if applyErr != nil { cartMutationFailuresTotal.Inc() } @@ -665,7 +503,7 @@ func (p *CartPool) Apply(id CartId, mutation interface{}) (*CartGrain, error) { } // Get returns the current state of a grain. -func (p *CartPool) Get(id CartId) (*CartGrain, error) { +func (p *CartPool) Get(id uint64) (*CartGrain, error) { grain, err := p.getOrClaimGrain(id) if err != nil { return nil, err @@ -674,7 +512,7 @@ func (p *CartPool) Get(id CartId) (*CartGrain, error) { } // OwnerHost reports the remote owner (if any) for the supplied cart id. -func (p *CartPool) OwnerHost(id CartId) (Host, bool) { +func (p *CartPool) OwnerHost(id uint64) (actor.Host, bool) { p.remoteMu.RLock() defer p.remoteMu.RUnlock() owner, ok := p.remoteOwners[id] @@ -691,11 +529,8 @@ func (p *CartPool) Close() { p.remoteMu.Lock() defer p.remoteMu.Unlock() for _, r := range p.remoteHosts { - go func(rh *RemoteHostGRPC) { - _, err := rh.ControlClient.Closing(context.Background(), &messages.ClosingNotice{Host: p.hostname}) - if err != nil { - log.Printf("Close notify to %s failed: %v", rh.Host, err) - } + go func(rh *proxy.RemoteHost) { + rh.Close() }(r) } if p.purgeTicker != nil { diff --git a/cart-grain.go b/cart-grain.go index d36f10f..8998d69 100644 --- a/cart-grain.go +++ b/cart-grain.go @@ -3,10 +3,11 @@ package main import ( "encoding/json" "fmt" + "slices" "sync" "time" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) // Legacy padded [16]byte CartId and its helper methods removed. @@ -61,7 +62,8 @@ type CartGrain struct { mu sync.RWMutex lastItemId int lastDeliveryId int - lastChange int64 // unix seconds of last successful mutation (replay sets from event ts) + lastAccess time.Time + lastChange time.Time // unix seconds of last successful mutation (replay sets from event ts) Id CartId `json:"id"` Items []*CartItem `json:"items"` TotalPrice int64 `json:"totalPrice"` @@ -74,21 +76,20 @@ type CartGrain struct { PaymentStatus string `json:"paymentStatus,omitempty"` } -type Grain interface { - GetId() CartId - Apply(content interface{}, isReplay bool) (*CartGrain, error) - GetCurrentState() (*CartGrain, error) -} - func (c *CartGrain) GetId() CartId { return c.Id } -func (c *CartGrain) GetLastChange() int64 { +func (c *CartGrain) GetLastChange() time.Time { return c.lastChange } +func (c *CartGrain) GetLastAccess() time.Time { + return c.lastAccess +} + func (c *CartGrain) GetCurrentState() (*CartGrain, error) { + c.lastAccess = time.Now() return c, nil } @@ -195,13 +196,7 @@ func (c *CartGrain) ItemsWithoutDelivery() []int { ret := make([]int, 0, len(c.Items)) hasDelivery := c.ItemsWithDelivery() for _, item := range c.Items { - found := false - for _, id := range hasDelivery { - if item.Id == id { - found = true - break - } - } + found := slices.Contains(hasDelivery, item.Id) if !found { ret = append(ret, item.Id) @@ -239,7 +234,8 @@ func (c *CartGrain) Apply(content interface{}, isReplay bool) (*CartGrain, error // Sliding TTL: update lastChange only for non-replay successful mutations. if updated != nil && !isReplay { - c.lastChange = time.Now().Unix() + c.lastChange = time.Now() + c.lastAccess = time.Now() _ = AppendCartEvent(c.Id, content) } diff --git a/discarded-host.go b/discarded-host.go deleted file mode 100644 index e7d560a..0000000 --- a/discarded-host.go +++ /dev/null @@ -1,84 +0,0 @@ -package main - -import ( - "fmt" - "log" - "net" - "sync" - "time" -) - -type DiscardedHost struct { - Host string - Tries int -} - -type DiscardedHostHandler struct { - mu sync.RWMutex - port int - hosts []*DiscardedHost - onConnection *func(string) -} - -func (d *DiscardedHostHandler) run() { - for range time.Tick(time.Second) { - d.mu.RLock() - lst := make([]*DiscardedHost, 0, len(d.hosts)) - for _, host := range d.hosts { - if host.Tries >= 0 && host.Tries < 5 { - go d.testConnection(host) - lst = append(lst, host) - } else { - if host.Tries > 0 { - log.Printf("Host %s discarded after %d tries", host.Host, host.Tries) - } - } - } - d.mu.RUnlock() - d.mu.Lock() - d.hosts = lst - d.mu.Unlock() - } - -} - -func (d *DiscardedHostHandler) testConnection(host *DiscardedHost) { - addr := fmt.Sprintf("%s:%d", host.Host, d.port) - conn, err := net.Dial("tcp", addr) - - if err != nil { - host.Tries++ - if host.Tries >= 5 { - // Exceeded retry threshold; will be dropped by run loop. - } - } else { - conn.Close() - if d.onConnection != nil { - fn := *d.onConnection - fn(host.Host) - } - } -} - -func NewDiscardedHostHandler(port int) *DiscardedHostHandler { - ret := &DiscardedHostHandler{ - hosts: make([]*DiscardedHost, 0), - port: port, - } - go ret.run() - return ret -} - -func (d *DiscardedHostHandler) SetReconnectHandler(fn func(string)) { - d.onConnection = &fn -} - -func (d *DiscardedHostHandler) AppendHost(host string) { - d.mu.Lock() - defer d.mu.Unlock() - log.Printf("Adding host %s to retry list", host) - d.hosts = append(d.hosts, &DiscardedHost{ - Host: host, - Tries: 0, - }) -} diff --git a/disk-storage.go b/disk-storage.go index 874a580..b525a91 100644 --- a/disk-storage.go +++ b/disk-storage.go @@ -12,14 +12,14 @@ func init() { type DiskStorage struct { stateFile string - lastSave int64 - LastSaves map[uint64]int64 + lastSave time.Time + LastSaves map[uint64]time.Time } func NewDiskStorage(stateFile string) (*DiskStorage, error) { ret := &DiskStorage{ stateFile: stateFile, - LastSaves: make(map[uint64]int64), + LastSaves: make(map[uint64]time.Time), } //err := ret.loadState() return ret, nil @@ -66,7 +66,7 @@ func NewDiskStorage(stateFile string) (*DiskStorage, error) { func (s *DiskStorage) Store(id CartId, _ *CartGrain) error { // With the removal of the legacy message log, we only update the timestamp. - ts := time.Now().Unix() + ts := time.Now() s.LastSaves[uint64(id)] = ts s.lastSave = ts return nil diff --git a/event_log.go b/event_log.go index 0481459..356b3f9 100644 --- a/event_log.go +++ b/event_log.go @@ -11,7 +11,7 @@ import ( "sync" "time" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -229,7 +229,7 @@ func ReplayCartEvents(grain *CartGrain, id CartId) error { for scanner.Scan() { line := scanner.Bytes() var raw struct { - Timestamp int64 `json:"ts"` + Timestamp time.Time `json:"ts"` Type string `json:"type"` Payload json.RawMessage `json:"payload"` } diff --git a/grpc_server.go b/grpc_server.go deleted file mode 100644 index a570eab..0000000 --- a/grpc_server.go +++ /dev/null @@ -1,121 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "net" - "time" - - messages "git.tornberg.me/go-cart-actor/proto" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" -) - -// cartActorGRPCServer implements the ControlPlane gRPC services. -// It delegates cart operations to a grain pool and cluster operations to a synced pool. -type cartActorGRPCServer struct { - messages.UnimplementedControlPlaneServer - - pool *CartPool -} - -// NewCartActorGRPCServer creates and initializes the server. -func NewCartActorGRPCServer(pool *CartPool) *cartActorGRPCServer { - return &cartActorGRPCServer{ - pool: pool, - } -} - -func (s *cartActorGRPCServer) AnnounceOwnership(ctx context.Context, req *messages.OwnershipAnnounce) (*messages.OwnerChangeAck, error) { - for _, cartId := range req.CartIds { - s.pool.removeLocalGrain(CartId(cartId)) - } - log.Printf("Ack count: %d", len(req.CartIds)) - return &messages.OwnerChangeAck{ - Accepted: true, - Message: "ownership announced", - }, nil -} - -func (s *cartActorGRPCServer) AnnounceExpiry(ctx context.Context, req *messages.ExpiryAnnounce) (*messages.OwnerChangeAck, error) { - s.pool.HandleRemoteExpiry(req.GetHost(), req.GetCartIds()) - return &messages.OwnerChangeAck{ - Accepted: true, - Message: "expiry acknowledged", - }, nil -} - -// ControlPlane: Ping -func (s *cartActorGRPCServer) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) { - // Expose cart owner cookie (first-touch owner = this host) for HTTP gateways translating gRPC metadata. - // Gateways that propagate Set-Cookie can help establish sticky sessions at the edge. - //_ = grpc.SendHeader(ctx, metadata.Pairs("set-cookie", fmt.Sprintf("cartowner=%s; Path=/; HttpOnly", s.syncedPool.Hostname()))) - return &messages.PingReply{ - Host: s.pool.Hostname(), - UnixTime: time.Now().Unix(), - }, nil -} - -// ControlPlane: Negotiate (merge host views) -func (s *cartActorGRPCServer) Negotiate(ctx context.Context, req *messages.NegotiateRequest) (*messages.NegotiateReply, error) { - hostSet := make(map[string]struct{}) - // Caller view - for _, h := range req.GetKnownHosts() { - if h != "" { - hostSet[h] = struct{}{} - } - } - // This host - hostSet[s.pool.Hostname()] = struct{}{} - // Known remotes - for _, h := range s.pool.RemoteHostNames() { - hostSet[h] = struct{}{} - } - - out := make([]string, 0, len(hostSet)) - for h := range hostSet { - out = append(out, h) - } - return &messages.NegotiateReply{Hosts: out}, nil -} - -// ControlPlane: GetCartIds (locally owned carts only) -func (s *cartActorGRPCServer) GetCartIds(ctx context.Context, _ *messages.Empty) (*messages.CartIdsReply, error) { - return &messages.CartIdsReply{CartIds: s.pool.LocalCartIDs()}, nil -} - -// ControlPlane: Closing (peer shutdown notification) -func (s *cartActorGRPCServer) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) { - if req.GetHost() != "" { - s.pool.RemoveHost(req.GetHost()) - } - return &messages.OwnerChangeAck{ - Accepted: true, - Message: "removed host", - }, nil -} - -// StartGRPCServer configures and starts the unified gRPC server on the given address. -// It registers both the CartActor and ControlPlane services. -func StartGRPCServer(addr string, pool *CartPool) (*grpc.Server, error) { - lis, err := net.Listen("tcp", addr) - if err != nil { - return nil, fmt.Errorf("failed to listen: %w", err) - } - - grpcServer := grpc.NewServer() - server := NewCartActorGRPCServer(pool) - - messages.RegisterControlPlaneServer(grpcServer, server) - reflection.Register(grpcServer) - - log.Printf("gRPC server listening on %s", addr) - go func() { - if err := grpcServer.Serve(lis); err != nil { - log.Fatalf("failed to serve gRPC: %v", err) - } - }() - - return grpcServer, nil -} diff --git a/main.go b/main.go index 634112c..fc2eea2 100644 --- a/main.go +++ b/main.go @@ -12,7 +12,9 @@ import ( "syscall" "time" - messages "git.tornberg.me/go-cart-actor/proto" + "git.tornberg.me/go-cart-actor/pkg/actor" + "git.tornberg.me/go-cart-actor/pkg/discovery" + messages "git.tornberg.me/go-cart-actor/pkg/messages" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -46,7 +48,8 @@ func spawn(id CartId) (*CartGrain, error) { TotalPrice: 0, } // Set baseline lastChange at spawn; replay may update it to last event timestamp. - ret.lastChange = time.Now().Unix() + ret.lastChange = time.Now() + ret.lastAccess = time.Now() // Legacy loadMessages (no-op) retained; then replay append-only event log //_ = loadMessages(ret, id) @@ -69,7 +72,7 @@ func (a *App) Save() error { if grain == nil { continue } - if grain.GetLastChange() > a.storage.LastSaves[uint64(id)] { + if grain.GetLastChange().After(a.storage.LastSaves[uint64(id)]) { err := a.storage.Store(id, grain) if err != nil { @@ -106,7 +109,7 @@ func getCountryFromHost(host string) string { return "se" } -func GetDiscovery() Discovery { +func GetDiscovery() discovery.Discovery { if podIp == "" { return nil } @@ -120,7 +123,7 @@ func GetDiscovery() Discovery { if err != nil { log.Fatalf("Error creating client: %v\n", err) } - return NewK8sDiscovery(client) + return discovery.NewK8sDiscovery(client) } func main() { @@ -137,9 +140,9 @@ func main() { storage: storage, } - grpcSrv, err := StartGRPCServer(":1337", pool) + grpcSrv, err := actor.NewControlServer(":1337", pool) if err != nil { - log.Fatalf("Error starting gRPC server: %v\n", err) + log.Fatalf("Error starting control plane gRPC server: %v\n", err) } defer grpcSrv.GracefulStop() @@ -236,7 +239,7 @@ func main() { w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Permissions-Policy", "payment=(self \"https://js.stripe.com\" \"https://m.stripe.network\" \"https://js.playground.kustom.co\")") w.WriteHeader(http.StatusOK) - w.Write([]byte(fmt.Sprintf(tpl, order.HTMLSnippet))) + fmt.Fprintf(w, tpl, order.HTMLSnippet) }) mux.HandleFunc("/confirmation/{order_id}", func(w http.ResponseWriter, r *http.Request) { @@ -263,7 +266,7 @@ func main() { } w.WriteHeader(http.StatusOK) - w.Write([]byte(fmt.Sprintf(tpl, order.HTMLSnippet))) + fmt.Fprintf(w, tpl, order.HTMLSnippet) }) mux.HandleFunc("/validate", func(w http.ResponseWriter, r *http.Request) { log.Printf("Klarna order validation, method: %s", r.Method) @@ -362,7 +365,7 @@ func triggerOrderCompleted(err error, syncedServer *PoolServer, order *CheckoutO if !ok { return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1) } - _, applyErr := syncedServer.pool.Apply(cid, mutation) + _, applyErr := syncedServer.pool.Apply(uint64(cid), mutation) if applyErr == nil { _ = AppendCartEvent(cid, mutation) } diff --git a/mutation_add_item.go b/mutation_add_item.go index 6053d3d..031367e 100644 --- a/mutation_add_item.go +++ b/mutation_add_item.go @@ -3,7 +3,7 @@ package main import ( "fmt" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) // mutation_add_item.go diff --git a/mutation_add_request.go b/mutation_add_request.go index a4ad874..0b04f39 100644 --- a/mutation_add_request.go +++ b/mutation_add_request.go @@ -3,7 +3,7 @@ package main import ( "fmt" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) // mutation_add_request.go diff --git a/mutation_change_quantity.go b/mutation_change_quantity.go index d311ce9..cc275e6 100644 --- a/mutation_change_quantity.go +++ b/mutation_change_quantity.go @@ -3,7 +3,7 @@ package main import ( "fmt" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) // mutation_change_quantity.go diff --git a/mutation_initialize_checkout.go b/mutation_initialize_checkout.go index f43c128..0d64988 100644 --- a/mutation_initialize_checkout.go +++ b/mutation_initialize_checkout.go @@ -3,7 +3,7 @@ package main import ( "fmt" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) // mutation_initialize_checkout.go diff --git a/mutation_order_created.go b/mutation_order_created.go index b83a2cb..aa978ae 100644 --- a/mutation_order_created.go +++ b/mutation_order_created.go @@ -3,7 +3,7 @@ package main import ( "fmt" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) // mutation_order_created.go diff --git a/mutation_remove_delivery.go b/mutation_remove_delivery.go index 49e9dcf..5f4f562 100644 --- a/mutation_remove_delivery.go +++ b/mutation_remove_delivery.go @@ -3,7 +3,7 @@ package main import ( "fmt" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) // mutation_remove_delivery.go diff --git a/mutation_remove_item.go b/mutation_remove_item.go index 420eff0..0e9913d 100644 --- a/mutation_remove_item.go +++ b/mutation_remove_item.go @@ -3,7 +3,7 @@ package main import ( "fmt" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) // mutation_remove_item.go diff --git a/mutation_set_cart_items.go b/mutation_set_cart_items.go index aa811a7..edaf352 100644 --- a/mutation_set_cart_items.go +++ b/mutation_set_cart_items.go @@ -3,7 +3,7 @@ package main import ( "fmt" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) // mutation_set_cart_items.go diff --git a/mutation_set_delivery.go b/mutation_set_delivery.go index 3cea3c7..72b5293 100644 --- a/mutation_set_delivery.go +++ b/mutation_set_delivery.go @@ -4,7 +4,7 @@ import ( "fmt" "slices" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) // mutation_set_delivery.go diff --git a/mutation_set_pickup_point.go b/mutation_set_pickup_point.go index ba8b8fe..5bbe8b9 100644 --- a/mutation_set_pickup_point.go +++ b/mutation_set_pickup_point.go @@ -3,7 +3,7 @@ package main import ( "fmt" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) // mutation_set_pickup_point.go diff --git a/pkg/actor/grain.go b/pkg/actor/grain.go new file mode 100644 index 0000000..c5d7967 --- /dev/null +++ b/pkg/actor/grain.go @@ -0,0 +1,11 @@ +package actor + +import "time" + +type Grain[V any] interface { + GetId() uint64 + Apply(content any, isReplay bool) (*V, error) + GetLastAccess() time.Time + GetLastChange() time.Time + GetCurrentState() (*V, error) +} diff --git a/pkg/actor/grain_pool.go b/pkg/actor/grain_pool.go new file mode 100644 index 0000000..e2777e3 --- /dev/null +++ b/pkg/actor/grain_pool.go @@ -0,0 +1,25 @@ +package actor + +import "net/http" + +type GrainPool[V any] interface { + Apply(id uint64, mutation any) (V, error) + Get(id uint64) (V, error) + OwnerHost(id uint64) (Host, bool) + Hostname() string + TakeOwnership(id uint64) + HandleOwnershipChange(host string, ids []uint64) error + HandleRemoteExpiry(host string, ids []uint64) error + Negotiate(otherHosts []string) + GetLocalIds() []uint64 + RemoveHost(host string) + IsHealthy() bool + Close() +} + +// Host abstracts a remote node capable of proxying cart requests. +type Host interface { + Name() string + Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error) + GetActorIds() []uint64 +} diff --git a/pkg/actor/grpc_server.go b/pkg/actor/grpc_server.go new file mode 100644 index 0000000..98b34dd --- /dev/null +++ b/pkg/actor/grpc_server.go @@ -0,0 +1,105 @@ +package actor + +import ( + "context" + "fmt" + "log" + "net" + "time" + + messages "git.tornberg.me/go-cart-actor/pkg/messages" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +// ControlServer implements the ControlPlane gRPC services. +// It delegates to a grain pool and cluster operations to a synced pool. +type ControlServer[V any] struct { + messages.UnimplementedControlPlaneServer + + pool GrainPool[V] +} + +func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages.OwnershipAnnounce) (*messages.OwnerChangeAck, error) { + err := s.pool.HandleOwnershipChange(req.Host, req.Ids) + if err != nil { + return &messages.OwnerChangeAck{ + Accepted: false, + Message: "owner change failed", + }, err + } + + log.Printf("Ack count: %d", len(req.Ids)) + return &messages.OwnerChangeAck{ + Accepted: true, + Message: "ownership announced", + }, nil +} + +func (s *ControlServer[V]) AnnounceExpiry(ctx context.Context, req *messages.ExpiryAnnounce) (*messages.OwnerChangeAck, error) { + err := s.pool.HandleRemoteExpiry(req.Host, req.Ids) + return &messages.OwnerChangeAck{ + Accepted: err == nil, + Message: "expiry acknowledged", + }, nil +} + +// ControlPlane: Ping +func (s *ControlServer[V]) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) { + // Expose cart owner cookie (first-touch owner = this host) for HTTP gateways translating gRPC metadata. + // Gateways that propagate Set-Cookie can help establish sticky sessions at the edge. + //_ = grpc.SendHeader(ctx, metadata.Pairs("set-cookie", fmt.Sprintf("cartowner=%s; Path=/; HttpOnly", s.syncedPool.Hostname()))) + return &messages.PingReply{ + Host: s.pool.Hostname(), + UnixTime: time.Now().Unix(), + }, nil +} + +// ControlPlane: Negotiate (merge host views) +func (s *ControlServer[V]) Negotiate(ctx context.Context, req *messages.NegotiateRequest) (*messages.NegotiateReply, error) { + + s.pool.Negotiate(req.KnownHosts) + return &messages.NegotiateReply{Hosts: req.GetKnownHosts()}, nil +} + +// ControlPlane: GetCartIds (locally owned carts only) +func (s *ControlServer[V]) GetLocalActorIds(ctx context.Context, _ *messages.Empty) (*messages.ActorIdsReply, error) { + return &messages.ActorIdsReply{Ids: s.pool.GetLocalIds()}, nil +} + +// ControlPlane: Closing (peer shutdown notification) +func (s *ControlServer[V]) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) { + if req.GetHost() != "" { + s.pool.RemoveHost(req.GetHost()) + } + return &messages.OwnerChangeAck{ + Accepted: true, + Message: "removed host", + }, nil +} + +// StartGRPCServer configures and starts the unified gRPC server on the given address. +// It registers both the CartActor and ControlPlane services. +func NewControlServer[V any](addr string, pool GrainPool[V]) (*grpc.Server, error) { + lis, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to listen: %w", err) + } + + grpcServer := grpc.NewServer() + server := &ControlServer[V]{ + pool: pool, + } + + messages.RegisterControlPlaneServer(grpcServer, server) + reflection.Register(grpcServer) + + log.Printf("gRPC server listening on %s", addr) + go func() { + if err := grpcServer.Serve(lis); err != nil { + log.Fatalf("failed to serve gRPC: %v", err) + } + }() + + return grpcServer, nil +} diff --git a/discovery.go b/pkg/discovery/discovery.go similarity index 93% rename from discovery.go rename to pkg/discovery/discovery.go index 20952a9..21b7d36 100644 --- a/discovery.go +++ b/pkg/discovery/discovery.go @@ -1,4 +1,4 @@ -package main +package discovery import ( "context" @@ -11,11 +11,6 @@ import ( toolsWatch "k8s.io/client-go/tools/watch" ) -type Discovery interface { - Discover() ([]string, error) - Watch() (<-chan HostChange, error) -} - type K8sDiscovery struct { ctx context.Context client *kubernetes.Clientset diff --git a/discovery_mock.go b/pkg/discovery/discovery_mock.go similarity index 99% rename from discovery_mock.go rename to pkg/discovery/discovery_mock.go index c770d61..85f791a 100644 --- a/discovery_mock.go +++ b/pkg/discovery/discovery_mock.go @@ -1,4 +1,4 @@ -package main +package discovery import ( "context" diff --git a/discovery_test.go b/pkg/discovery/discovery_test.go similarity index 98% rename from discovery_test.go rename to pkg/discovery/discovery_test.go index fa92ff3..e2a955c 100644 --- a/discovery_test.go +++ b/pkg/discovery/discovery_test.go @@ -1,4 +1,4 @@ -package main +package discovery import ( "testing" diff --git a/pkg/discovery/types.go b/pkg/discovery/types.go new file mode 100644 index 0000000..6613553 --- /dev/null +++ b/pkg/discovery/types.go @@ -0,0 +1,6 @@ +package discovery + +type Discovery interface { + Discover() ([]string, error) + Watch() (<-chan HostChange, error) +} diff --git a/proto/control_plane.pb.go b/pkg/messages/control_plane.pb.go similarity index 73% rename from proto/control_plane.pb.go rename to pkg/messages/control_plane.pb.go index 6478a54..1bd4ef8 100644 --- a/proto/control_plane.pb.go +++ b/pkg/messages/control_plane.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.9 +// protoc-gen-go v1.36.10 // protoc v6.32.1 -// source: proto/control_plane.proto +// source: control_plane.proto package messages @@ -30,7 +30,7 @@ type Empty struct { func (x *Empty) Reset() { *x = Empty{} - mi := &file_proto_control_plane_proto_msgTypes[0] + mi := &file_control_plane_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -42,7 +42,7 @@ func (x *Empty) String() string { func (*Empty) ProtoMessage() {} func (x *Empty) ProtoReflect() protoreflect.Message { - mi := &file_proto_control_plane_proto_msgTypes[0] + mi := &file_control_plane_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -55,7 +55,7 @@ func (x *Empty) ProtoReflect() protoreflect.Message { // Deprecated: Use Empty.ProtoReflect.Descriptor instead. func (*Empty) Descriptor() ([]byte, []int) { - return file_proto_control_plane_proto_rawDescGZIP(), []int{0} + return file_control_plane_proto_rawDescGZIP(), []int{0} } // Ping reply includes responding host and its current unix time (seconds). @@ -69,7 +69,7 @@ type PingReply struct { func (x *PingReply) Reset() { *x = PingReply{} - mi := &file_proto_control_plane_proto_msgTypes[1] + mi := &file_control_plane_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -81,7 +81,7 @@ func (x *PingReply) String() string { func (*PingReply) ProtoMessage() {} func (x *PingReply) ProtoReflect() protoreflect.Message { - mi := &file_proto_control_plane_proto_msgTypes[1] + mi := &file_control_plane_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -94,7 +94,7 @@ func (x *PingReply) ProtoReflect() protoreflect.Message { // Deprecated: Use PingReply.ProtoReflect.Descriptor instead. func (*PingReply) Descriptor() ([]byte, []int) { - return file_proto_control_plane_proto_rawDescGZIP(), []int{1} + return file_control_plane_proto_rawDescGZIP(), []int{1} } func (x *PingReply) GetHost() string { @@ -121,7 +121,7 @@ type NegotiateRequest struct { func (x *NegotiateRequest) Reset() { *x = NegotiateRequest{} - mi := &file_proto_control_plane_proto_msgTypes[2] + mi := &file_control_plane_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -133,7 +133,7 @@ func (x *NegotiateRequest) String() string { func (*NegotiateRequest) ProtoMessage() {} func (x *NegotiateRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_control_plane_proto_msgTypes[2] + mi := &file_control_plane_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -146,7 +146,7 @@ func (x *NegotiateRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use NegotiateRequest.ProtoReflect.Descriptor instead. func (*NegotiateRequest) Descriptor() ([]byte, []int) { - return file_proto_control_plane_proto_rawDescGZIP(), []int{2} + return file_control_plane_proto_rawDescGZIP(), []int{2} } func (x *NegotiateRequest) GetKnownHosts() []string { @@ -166,7 +166,7 @@ type NegotiateReply struct { func (x *NegotiateReply) Reset() { *x = NegotiateReply{} - mi := &file_proto_control_plane_proto_msgTypes[3] + mi := &file_control_plane_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -178,7 +178,7 @@ func (x *NegotiateReply) String() string { func (*NegotiateReply) ProtoMessage() {} func (x *NegotiateReply) ProtoReflect() protoreflect.Message { - mi := &file_proto_control_plane_proto_msgTypes[3] + mi := &file_control_plane_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -191,7 +191,7 @@ func (x *NegotiateReply) ProtoReflect() protoreflect.Message { // Deprecated: Use NegotiateReply.ProtoReflect.Descriptor instead. func (*NegotiateReply) Descriptor() ([]byte, []int) { - return file_proto_control_plane_proto_rawDescGZIP(), []int{3} + return file_control_plane_proto_rawDescGZIP(), []int{3} } func (x *NegotiateReply) GetHosts() []string { @@ -202,28 +202,28 @@ func (x *NegotiateReply) GetHosts() []string { } // CartIdsReply returns the list of cart IDs (string form) currently owned locally. -type CartIdsReply struct { +type ActorIdsReply struct { state protoimpl.MessageState `protogen:"open.v1"` - CartIds []uint64 `protobuf:"varint,1,rep,packed,name=cart_ids,json=cartIds,proto3" json:"cart_ids,omitempty"` + Ids []uint64 `protobuf:"varint,1,rep,packed,name=ids,proto3" json:"ids,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } -func (x *CartIdsReply) Reset() { - *x = CartIdsReply{} - mi := &file_proto_control_plane_proto_msgTypes[4] +func (x *ActorIdsReply) Reset() { + *x = ActorIdsReply{} + mi := &file_control_plane_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *CartIdsReply) String() string { +func (x *ActorIdsReply) String() string { return protoimpl.X.MessageStringOf(x) } -func (*CartIdsReply) ProtoMessage() {} +func (*ActorIdsReply) ProtoMessage() {} -func (x *CartIdsReply) ProtoReflect() protoreflect.Message { - mi := &file_proto_control_plane_proto_msgTypes[4] +func (x *ActorIdsReply) ProtoReflect() protoreflect.Message { + mi := &file_control_plane_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -234,14 +234,14 @@ func (x *CartIdsReply) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use CartIdsReply.ProtoReflect.Descriptor instead. -func (*CartIdsReply) Descriptor() ([]byte, []int) { - return file_proto_control_plane_proto_rawDescGZIP(), []int{4} +// Deprecated: Use ActorIdsReply.ProtoReflect.Descriptor instead. +func (*ActorIdsReply) Descriptor() ([]byte, []int) { + return file_control_plane_proto_rawDescGZIP(), []int{4} } -func (x *CartIdsReply) GetCartIds() []uint64 { +func (x *ActorIdsReply) GetIds() []uint64 { if x != nil { - return x.CartIds + return x.Ids } return nil } @@ -257,7 +257,7 @@ type OwnerChangeAck struct { func (x *OwnerChangeAck) Reset() { *x = OwnerChangeAck{} - mi := &file_proto_control_plane_proto_msgTypes[5] + mi := &file_control_plane_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -269,7 +269,7 @@ func (x *OwnerChangeAck) String() string { func (*OwnerChangeAck) ProtoMessage() {} func (x *OwnerChangeAck) ProtoReflect() protoreflect.Message { - mi := &file_proto_control_plane_proto_msgTypes[5] + mi := &file_control_plane_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -282,7 +282,7 @@ func (x *OwnerChangeAck) ProtoReflect() protoreflect.Message { // Deprecated: Use OwnerChangeAck.ProtoReflect.Descriptor instead. func (*OwnerChangeAck) Descriptor() ([]byte, []int) { - return file_proto_control_plane_proto_rawDescGZIP(), []int{5} + return file_control_plane_proto_rawDescGZIP(), []int{5} } func (x *OwnerChangeAck) GetAccepted() bool { @@ -309,7 +309,7 @@ type ClosingNotice struct { func (x *ClosingNotice) Reset() { *x = ClosingNotice{} - mi := &file_proto_control_plane_proto_msgTypes[6] + mi := &file_control_plane_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -321,7 +321,7 @@ func (x *ClosingNotice) String() string { func (*ClosingNotice) ProtoMessage() {} func (x *ClosingNotice) ProtoReflect() protoreflect.Message { - mi := &file_proto_control_plane_proto_msgTypes[6] + mi := &file_control_plane_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -334,7 +334,7 @@ func (x *ClosingNotice) ProtoReflect() protoreflect.Message { // Deprecated: Use ClosingNotice.ProtoReflect.Descriptor instead. func (*ClosingNotice) Descriptor() ([]byte, []int) { - return file_proto_control_plane_proto_rawDescGZIP(), []int{6} + return file_control_plane_proto_rawDescGZIP(), []int{6} } func (x *ClosingNotice) GetHost() string { @@ -348,15 +348,15 @@ func (x *ClosingNotice) GetHost() string { // First claim wins; receivers SHOULD NOT overwrite an existing different owner. type OwnershipAnnounce struct { state protoimpl.MessageState `protogen:"open.v1"` - Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` // announcing host - CartIds []uint64 `protobuf:"varint,2,rep,packed,name=cart_ids,json=cartIds,proto3" json:"cart_ids,omitempty"` // newly claimed cart ids + Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` // announcing host + Ids []uint64 `protobuf:"varint,2,rep,packed,name=ids,proto3" json:"ids,omitempty"` // newly claimed cart ids unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *OwnershipAnnounce) Reset() { *x = OwnershipAnnounce{} - mi := &file_proto_control_plane_proto_msgTypes[7] + mi := &file_control_plane_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -368,7 +368,7 @@ func (x *OwnershipAnnounce) String() string { func (*OwnershipAnnounce) ProtoMessage() {} func (x *OwnershipAnnounce) ProtoReflect() protoreflect.Message { - mi := &file_proto_control_plane_proto_msgTypes[7] + mi := &file_control_plane_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -381,7 +381,7 @@ func (x *OwnershipAnnounce) ProtoReflect() protoreflect.Message { // Deprecated: Use OwnershipAnnounce.ProtoReflect.Descriptor instead. func (*OwnershipAnnounce) Descriptor() ([]byte, []int) { - return file_proto_control_plane_proto_rawDescGZIP(), []int{7} + return file_control_plane_proto_rawDescGZIP(), []int{7} } func (x *OwnershipAnnounce) GetHost() string { @@ -391,9 +391,9 @@ func (x *OwnershipAnnounce) GetHost() string { return "" } -func (x *OwnershipAnnounce) GetCartIds() []uint64 { +func (x *OwnershipAnnounce) GetIds() []uint64 { if x != nil { - return x.CartIds + return x.Ids } return nil } @@ -402,14 +402,14 @@ func (x *OwnershipAnnounce) GetCartIds() []uint64 { type ExpiryAnnounce struct { state protoimpl.MessageState `protogen:"open.v1"` Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` - CartIds []uint64 `protobuf:"varint,2,rep,packed,name=cart_ids,json=cartIds,proto3" json:"cart_ids,omitempty"` + Ids []uint64 `protobuf:"varint,2,rep,packed,name=ids,proto3" json:"ids,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *ExpiryAnnounce) Reset() { *x = ExpiryAnnounce{} - mi := &file_proto_control_plane_proto_msgTypes[8] + mi := &file_control_plane_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -421,7 +421,7 @@ func (x *ExpiryAnnounce) String() string { func (*ExpiryAnnounce) ProtoMessage() {} func (x *ExpiryAnnounce) ProtoReflect() protoreflect.Message { - mi := &file_proto_control_plane_proto_msgTypes[8] + mi := &file_control_plane_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -434,7 +434,7 @@ func (x *ExpiryAnnounce) ProtoReflect() protoreflect.Message { // Deprecated: Use ExpiryAnnounce.ProtoReflect.Descriptor instead. func (*ExpiryAnnounce) Descriptor() ([]byte, []int) { - return file_proto_control_plane_proto_rawDescGZIP(), []int{8} + return file_control_plane_proto_rawDescGZIP(), []int{8} } func (x *ExpiryAnnounce) GetHost() string { @@ -444,18 +444,18 @@ func (x *ExpiryAnnounce) GetHost() string { return "" } -func (x *ExpiryAnnounce) GetCartIds() []uint64 { +func (x *ExpiryAnnounce) GetIds() []uint64 { if x != nil { - return x.CartIds + return x.Ids } return nil } -var File_proto_control_plane_proto protoreflect.FileDescriptor +var File_control_plane_proto protoreflect.FileDescriptor -const file_proto_control_plane_proto_rawDesc = "" + +const file_control_plane_proto_rawDesc = "" + "\n" + - "\x19proto/control_plane.proto\x12\bmessages\"\a\n" + + "\x13control_plane.proto\x12\bmessages\"\a\n" + "\x05Empty\"<\n" + "\tPingReply\x12\x12\n" + "\x04host\x18\x01 \x01(\tR\x04host\x12\x1b\n" + @@ -464,63 +464,62 @@ const file_proto_control_plane_proto_rawDesc = "" + "\vknown_hosts\x18\x01 \x03(\tR\n" + "knownHosts\"&\n" + "\x0eNegotiateReply\x12\x14\n" + - "\x05hosts\x18\x01 \x03(\tR\x05hosts\")\n" + - "\fCartIdsReply\x12\x19\n" + - "\bcart_ids\x18\x01 \x03(\x04R\acartIds\"F\n" + + "\x05hosts\x18\x01 \x03(\tR\x05hosts\"!\n" + + "\rActorIdsReply\x12\x10\n" + + "\x03ids\x18\x01 \x03(\x04R\x03ids\"F\n" + "\x0eOwnerChangeAck\x12\x1a\n" + "\baccepted\x18\x01 \x01(\bR\baccepted\x12\x18\n" + "\amessage\x18\x02 \x01(\tR\amessage\"#\n" + "\rClosingNotice\x12\x12\n" + - "\x04host\x18\x01 \x01(\tR\x04host\"B\n" + + "\x04host\x18\x01 \x01(\tR\x04host\"9\n" + "\x11OwnershipAnnounce\x12\x12\n" + - "\x04host\x18\x01 \x01(\tR\x04host\x12\x19\n" + - "\bcart_ids\x18\x02 \x03(\x04R\acartIds\"?\n" + + "\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" + + "\x03ids\x18\x02 \x03(\x04R\x03ids\"6\n" + "\x0eExpiryAnnounce\x12\x12\n" + - "\x04host\x18\x01 \x01(\tR\x04host\x12\x19\n" + - "\bcart_ids\x18\x02 \x03(\x04R\acartIds2\x86\x03\n" + + "\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" + + "\x03ids\x18\x02 \x03(\x04R\x03ids2\x8d\x03\n" + "\fControlPlane\x12,\n" + "\x04Ping\x12\x0f.messages.Empty\x1a\x13.messages.PingReply\x12A\n" + - "\tNegotiate\x12\x1a.messages.NegotiateRequest\x1a\x18.messages.NegotiateReply\x125\n" + - "\n" + - "GetCartIds\x12\x0f.messages.Empty\x1a\x16.messages.CartIdsReply\x12J\n" + + "\tNegotiate\x12\x1a.messages.NegotiateRequest\x1a\x18.messages.NegotiateReply\x12<\n" + + "\x10GetLocalActorIds\x12\x0f.messages.Empty\x1a\x17.messages.ActorIdsReply\x12J\n" + "\x11AnnounceOwnership\x12\x1b.messages.OwnershipAnnounce\x1a\x18.messages.OwnerChangeAck\x12D\n" + "\x0eAnnounceExpiry\x12\x18.messages.ExpiryAnnounce\x1a\x18.messages.OwnerChangeAck\x12<\n" + "\aClosing\x12\x17.messages.ClosingNotice\x1a\x18.messages.OwnerChangeAckB.Z,git.tornberg.me/go-cart-actor/proto;messagesb\x06proto3" var ( - file_proto_control_plane_proto_rawDescOnce sync.Once - file_proto_control_plane_proto_rawDescData []byte + file_control_plane_proto_rawDescOnce sync.Once + file_control_plane_proto_rawDescData []byte ) -func file_proto_control_plane_proto_rawDescGZIP() []byte { - file_proto_control_plane_proto_rawDescOnce.Do(func() { - file_proto_control_plane_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_proto_control_plane_proto_rawDesc), len(file_proto_control_plane_proto_rawDesc))) +func file_control_plane_proto_rawDescGZIP() []byte { + file_control_plane_proto_rawDescOnce.Do(func() { + file_control_plane_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_control_plane_proto_rawDesc), len(file_control_plane_proto_rawDesc))) }) - return file_proto_control_plane_proto_rawDescData + return file_control_plane_proto_rawDescData } -var file_proto_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 9) -var file_proto_control_plane_proto_goTypes = []any{ +var file_control_plane_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_control_plane_proto_goTypes = []any{ (*Empty)(nil), // 0: messages.Empty (*PingReply)(nil), // 1: messages.PingReply (*NegotiateRequest)(nil), // 2: messages.NegotiateRequest (*NegotiateReply)(nil), // 3: messages.NegotiateReply - (*CartIdsReply)(nil), // 4: messages.CartIdsReply + (*ActorIdsReply)(nil), // 4: messages.ActorIdsReply (*OwnerChangeAck)(nil), // 5: messages.OwnerChangeAck (*ClosingNotice)(nil), // 6: messages.ClosingNotice (*OwnershipAnnounce)(nil), // 7: messages.OwnershipAnnounce (*ExpiryAnnounce)(nil), // 8: messages.ExpiryAnnounce } -var file_proto_control_plane_proto_depIdxs = []int32{ +var file_control_plane_proto_depIdxs = []int32{ 0, // 0: messages.ControlPlane.Ping:input_type -> messages.Empty 2, // 1: messages.ControlPlane.Negotiate:input_type -> messages.NegotiateRequest - 0, // 2: messages.ControlPlane.GetCartIds:input_type -> messages.Empty + 0, // 2: messages.ControlPlane.GetLocalActorIds:input_type -> messages.Empty 7, // 3: messages.ControlPlane.AnnounceOwnership:input_type -> messages.OwnershipAnnounce 8, // 4: messages.ControlPlane.AnnounceExpiry:input_type -> messages.ExpiryAnnounce 6, // 5: messages.ControlPlane.Closing:input_type -> messages.ClosingNotice 1, // 6: messages.ControlPlane.Ping:output_type -> messages.PingReply 3, // 7: messages.ControlPlane.Negotiate:output_type -> messages.NegotiateReply - 4, // 8: messages.ControlPlane.GetCartIds:output_type -> messages.CartIdsReply + 4, // 8: messages.ControlPlane.GetLocalActorIds:output_type -> messages.ActorIdsReply 5, // 9: messages.ControlPlane.AnnounceOwnership:output_type -> messages.OwnerChangeAck 5, // 10: messages.ControlPlane.AnnounceExpiry:output_type -> messages.OwnerChangeAck 5, // 11: messages.ControlPlane.Closing:output_type -> messages.OwnerChangeAck @@ -531,26 +530,26 @@ var file_proto_control_plane_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for field type_name } -func init() { file_proto_control_plane_proto_init() } -func file_proto_control_plane_proto_init() { - if File_proto_control_plane_proto != nil { +func init() { file_control_plane_proto_init() } +func file_control_plane_proto_init() { + if File_control_plane_proto != nil { return } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_control_plane_proto_rawDesc), len(file_proto_control_plane_proto_rawDesc)), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_control_plane_proto_rawDesc), len(file_control_plane_proto_rawDesc)), NumEnums: 0, NumMessages: 9, NumExtensions: 0, NumServices: 1, }, - GoTypes: file_proto_control_plane_proto_goTypes, - DependencyIndexes: file_proto_control_plane_proto_depIdxs, - MessageInfos: file_proto_control_plane_proto_msgTypes, + GoTypes: file_control_plane_proto_goTypes, + DependencyIndexes: file_control_plane_proto_depIdxs, + MessageInfos: file_control_plane_proto_msgTypes, }.Build() - File_proto_control_plane_proto = out.File - file_proto_control_plane_proto_goTypes = nil - file_proto_control_plane_proto_depIdxs = nil + File_control_plane_proto = out.File + file_control_plane_proto_goTypes = nil + file_control_plane_proto_depIdxs = nil } diff --git a/proto/control_plane_grpc.pb.go b/pkg/messages/control_plane_grpc.pb.go similarity index 90% rename from proto/control_plane_grpc.pb.go rename to pkg/messages/control_plane_grpc.pb.go index 271fbad..08e2815 100644 --- a/proto/control_plane_grpc.pb.go +++ b/pkg/messages/control_plane_grpc.pb.go @@ -2,7 +2,7 @@ // versions: // - protoc-gen-go-grpc v1.5.1 // - protoc v6.32.1 -// source: proto/control_plane.proto +// source: control_plane.proto package messages @@ -21,7 +21,7 @@ const _ = grpc.SupportPackageIsVersion9 const ( ControlPlane_Ping_FullMethodName = "/messages.ControlPlane/Ping" ControlPlane_Negotiate_FullMethodName = "/messages.ControlPlane/Negotiate" - ControlPlane_GetCartIds_FullMethodName = "/messages.ControlPlane/GetCartIds" + ControlPlane_GetLocalActorIds_FullMethodName = "/messages.ControlPlane/GetLocalActorIds" ControlPlane_AnnounceOwnership_FullMethodName = "/messages.ControlPlane/AnnounceOwnership" ControlPlane_AnnounceExpiry_FullMethodName = "/messages.ControlPlane/AnnounceExpiry" ControlPlane_Closing_FullMethodName = "/messages.ControlPlane/Closing" @@ -38,7 +38,7 @@ type ControlPlaneClient interface { // Negotiate merges host views; used during discovery & convergence. Negotiate(ctx context.Context, in *NegotiateRequest, opts ...grpc.CallOption) (*NegotiateReply, error) // GetCartIds lists currently owned cart IDs on this node. - GetCartIds(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*CartIdsReply, error) + GetLocalActorIds(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*ActorIdsReply, error) // Ownership announcement: first-touch claim broadcast (idempotent; best-effort). AnnounceOwnership(ctx context.Context, in *OwnershipAnnounce, opts ...grpc.CallOption) (*OwnerChangeAck, error) // Expiry announcement: drop remote ownership hints when local TTL expires. @@ -75,10 +75,10 @@ func (c *controlPlaneClient) Negotiate(ctx context.Context, in *NegotiateRequest return out, nil } -func (c *controlPlaneClient) GetCartIds(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*CartIdsReply, error) { +func (c *controlPlaneClient) GetLocalActorIds(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*ActorIdsReply, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(CartIdsReply) - err := c.cc.Invoke(ctx, ControlPlane_GetCartIds_FullMethodName, in, out, cOpts...) + out := new(ActorIdsReply) + err := c.cc.Invoke(ctx, ControlPlane_GetLocalActorIds_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -126,7 +126,7 @@ type ControlPlaneServer interface { // Negotiate merges host views; used during discovery & convergence. Negotiate(context.Context, *NegotiateRequest) (*NegotiateReply, error) // GetCartIds lists currently owned cart IDs on this node. - GetCartIds(context.Context, *Empty) (*CartIdsReply, error) + GetLocalActorIds(context.Context, *Empty) (*ActorIdsReply, error) // Ownership announcement: first-touch claim broadcast (idempotent; best-effort). AnnounceOwnership(context.Context, *OwnershipAnnounce) (*OwnerChangeAck, error) // Expiry announcement: drop remote ownership hints when local TTL expires. @@ -149,8 +149,8 @@ func (UnimplementedControlPlaneServer) Ping(context.Context, *Empty) (*PingReply func (UnimplementedControlPlaneServer) Negotiate(context.Context, *NegotiateRequest) (*NegotiateReply, error) { return nil, status.Errorf(codes.Unimplemented, "method Negotiate not implemented") } -func (UnimplementedControlPlaneServer) GetCartIds(context.Context, *Empty) (*CartIdsReply, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetCartIds not implemented") +func (UnimplementedControlPlaneServer) GetLocalActorIds(context.Context, *Empty) (*ActorIdsReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetLocalActorIds not implemented") } func (UnimplementedControlPlaneServer) AnnounceOwnership(context.Context, *OwnershipAnnounce) (*OwnerChangeAck, error) { return nil, status.Errorf(codes.Unimplemented, "method AnnounceOwnership not implemented") @@ -218,20 +218,20 @@ func _ControlPlane_Negotiate_Handler(srv interface{}, ctx context.Context, dec f return interceptor(ctx, in, info, handler) } -func _ControlPlane_GetCartIds_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _ControlPlane_GetLocalActorIds_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ControlPlaneServer).GetCartIds(ctx, in) + return srv.(ControlPlaneServer).GetLocalActorIds(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: ControlPlane_GetCartIds_FullMethodName, + FullMethod: ControlPlane_GetLocalActorIds_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ControlPlaneServer).GetCartIds(ctx, req.(*Empty)) + return srv.(ControlPlaneServer).GetLocalActorIds(ctx, req.(*Empty)) } return interceptor(ctx, in, info, handler) } @@ -306,8 +306,8 @@ var ControlPlane_ServiceDesc = grpc.ServiceDesc{ Handler: _ControlPlane_Negotiate_Handler, }, { - MethodName: "GetCartIds", - Handler: _ControlPlane_GetCartIds_Handler, + MethodName: "GetLocalActorIds", + Handler: _ControlPlane_GetLocalActorIds_Handler, }, { MethodName: "AnnounceOwnership", @@ -323,5 +323,5 @@ var ControlPlane_ServiceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{}, - Metadata: "proto/control_plane.proto", + Metadata: "control_plane.proto", } diff --git a/proto/messages.pb.go b/pkg/messages/messages.pb.go similarity index 99% rename from proto/messages.pb.go rename to pkg/messages/messages.pb.go index 506bb46..fa19392 100644 --- a/proto/messages.pb.go +++ b/pkg/messages/messages.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.10 -// protoc v3.21.12 +// protoc v6.32.1 // source: messages.proto package messages diff --git a/remotehost.go b/pkg/proxy/remotehost.go similarity index 64% rename from remotehost.go rename to pkg/proxy/remotehost.go index 348ead0..3b17b4a 100644 --- a/remotehost.go +++ b/pkg/proxy/remotehost.go @@ -1,4 +1,4 @@ -package main +package proxy import ( "bytes" @@ -9,24 +9,24 @@ import ( "net/http" "time" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) -// RemoteHostGRPC mirrors the lightweight controller used for remote node +// RemoteHost mirrors the lightweight controller used for remote node // interaction. -type RemoteHostGRPC struct { +type RemoteHost struct { Host string - HTTPBase string - Conn *grpc.ClientConn - Transport *http.Transport - Client *http.Client - ControlClient messages.ControlPlaneClient + httpBase string + conn *grpc.ClientConn + transport *http.Transport + client *http.Client + controlClient messages.ControlPlaneClient MissedPings int } -func NewRemoteHostGRPC(host string) (*RemoteHostGRPC, error) { +func NewRemoteHost(host string) (*RemoteHost, error) { target := fmt.Sprintf("%s:1337", host) @@ -38,7 +38,7 @@ func NewRemoteHostGRPC(host string) (*RemoteHostGRPC, error) { } controlClient := messages.NewControlPlaneClient(conn) - for retries := 0; retries < 3; retries++ { + for retries := range 3 { ctx, pingCancel := context.WithTimeout(context.Background(), time.Second) _, pingErr := controlClient.Ping(ctx, &messages.Empty{}) pingCancel() @@ -60,32 +60,32 @@ func NewRemoteHostGRPC(host string) (*RemoteHostGRPC, error) { } client := &http.Client{Transport: transport, Timeout: 10 * time.Second} - return &RemoteHostGRPC{ + return &RemoteHost{ Host: host, - HTTPBase: fmt.Sprintf("http://%s:8080/cart", host), - Conn: conn, - Transport: transport, - Client: client, - ControlClient: controlClient, + httpBase: fmt.Sprintf("http://%s:8080/cart", host), + conn: conn, + transport: transport, + client: client, + controlClient: controlClient, MissedPings: 0, }, nil } -func (h *RemoteHostGRPC) Name() string { +func (h *RemoteHost) Name() string { return h.Host } -func (h *RemoteHostGRPC) Close() error { - if h.Conn != nil { - h.Conn.Close() +func (h *RemoteHost) Close() error { + if h.conn != nil { + h.conn.Close() } return nil } -func (h *RemoteHostGRPC) Ping() bool { +func (h *RemoteHost) Ping() bool { ctx, cancel := context.WithTimeout(context.Background(), time.Second) - _, err := h.ControlClient.Ping(ctx, &messages.Empty{}) + _, err := h.controlClient.Ping(ctx, &messages.Empty{}) cancel() if err != nil { h.MissedPings++ @@ -97,11 +97,11 @@ func (h *RemoteHostGRPC) Ping() bool { return true } -func (h *RemoteHostGRPC) Negotiate(knownHosts []string) ([]string, error) { +func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - resp, err := h.ControlClient.Negotiate(ctx, &messages.NegotiateRequest{ + resp, err := h.controlClient.Negotiate(ctx, &messages.NegotiateRequest{ KnownHosts: knownHosts, }) if err != nil { @@ -113,10 +113,22 @@ func (h *RemoteHostGRPC) Negotiate(knownHosts []string) ([]string, error) { return resp.Hosts, nil } -func (h *RemoteHostGRPC) AnnounceOwnership(uids []uint64) { - _, err := h.ControlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ - Host: h.Host, - CartIds: uids, +func (h *RemoteHost) GetActorIds() []uint64 { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + reply, err := h.controlClient.GetLocalActorIds(ctx, &messages.Empty{}) + if err != nil { + log.Printf("Init remote %s: GetCartIds error: %v", h.Host, err) + h.MissedPings++ + return []uint64{} + } + return reply.GetIds() +} + +func (h *RemoteHost) AnnounceOwnership(uids []uint64) { + _, err := h.controlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ + Host: h.Host, + Ids: uids, }) if err != nil { log.Printf("ownership announce to %s failed: %v", h.Host, err) @@ -126,10 +138,10 @@ func (h *RemoteHostGRPC) AnnounceOwnership(uids []uint64) { h.MissedPings = 0 } -func (h *RemoteHostGRPC) AnnounceExpiry(uids []uint64) { - _, err := h.ControlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{ - Host: h.Host, - CartIds: uids, +func (h *RemoteHost) AnnounceExpiry(uids []uint64) { + _, err := h.controlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{ + Host: h.Host, + Ids: uids, }) if err != nil { log.Printf("expiry announce to %s failed: %v", h.Host, err) @@ -139,8 +151,8 @@ func (h *RemoteHostGRPC) AnnounceExpiry(uids []uint64) { h.MissedPings = 0 } -func (h *RemoteHostGRPC) Proxy(id CartId, w http.ResponseWriter, r *http.Request) (bool, error) { - target := fmt.Sprintf("%s%s", h.HTTPBase, r.URL.RequestURI()) +func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error) { + target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI()) var bodyCopy []byte if r.Body != nil && r.Body != http.NoBody { var err error @@ -164,15 +176,13 @@ func (h *RemoteHostGRPC) Proxy(id CartId, w http.ResponseWriter, r *http.Request } r.Body = io.NopCloser(bytes.NewReader(bodyCopy)) req.Header.Set("X-Forwarded-Host", r.Host) - if idStr := id.String(); idStr != "" { - req.Header.Set("X-Cart-Id", idStr) - } + for k, v := range r.Header { for _, vv := range v { req.Header.Add(k, vv) } } - res, err := h.Client.Do(req) + res, err := h.client.Do(req) if err != nil { http.Error(w, "proxy request error", http.StatusBadGateway) return false, err @@ -195,6 +205,6 @@ func (h *RemoteHostGRPC) Proxy(id CartId, w http.ResponseWriter, r *http.Request return false, fmt.Errorf("proxy response status %d", res.StatusCode) } -func (r *RemoteHostGRPC) IsHealthy() bool { +func (r *RemoteHost) IsHealthy() bool { return r.MissedPings < 3 } diff --git a/pool-server.go b/pool-server.go index 769b2ab..4b7bcfa 100644 --- a/pool-server.go +++ b/pool-server.go @@ -9,15 +9,15 @@ import ( "strconv" "time" - messages "git.tornberg.me/go-cart-actor/proto" + messages "git.tornberg.me/go-cart-actor/pkg/messages" ) type PoolServer struct { pod_name string - pool GrainPool + pool *CartPool } -func NewPoolServer(pool GrainPool, pod_name string) *PoolServer { +func NewPoolServer(pool *CartPool, pod_name string) *PoolServer { return &PoolServer{ pod_name: pod_name, pool: pool, @@ -25,7 +25,7 @@ func NewPoolServer(pool GrainPool, pod_name string) *PoolServer { } func (s *PoolServer) process(id CartId, mutation interface{}) (*CartGrain, error) { - grain, err := s.pool.Apply(id, mutation) + grain, err := s.pool.Apply(uint64(id), mutation) if err != nil { return nil, err } @@ -33,7 +33,7 @@ func (s *PoolServer) process(id CartId, mutation interface{}) (*CartGrain, error } func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request, id CartId) error { - grain, err := s.pool.Get(id) + grain, err := s.pool.Get(uint64(id)) if err != nil { return err } @@ -50,17 +50,6 @@ func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request, id Car return s.WriteResult(w, data) } -func ErrorHandler(fn func(w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - err := fn(w, r) - if err != nil { - log.Printf("Server error, not remote error: %v\n", err) - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - } - } -} - func (s *PoolServer) WriteResult(w http.ResponseWriter, result *CartGrain) error { w.Header().Set("Content-Type", "application/json") w.Header().Set("Cache-Control", "no-cache") @@ -215,18 +204,35 @@ func (s *PoolServer) HandleConfirmation(w http.ResponseWriter, r *http.Request, return json.NewEncoder(w).Encode(order) } +func getCurrency(country string) string { + if country == "no" { + return "NOK" + } + return "SEK" +} + +func getLocale(country string) string { + if country == "no" { + return "nb-no" + } + return "sv-se" +} + func (s *PoolServer) CreateOrUpdateCheckout(host string, id CartId) (*CheckoutOrder, error) { + country := getCountryFromHost(host) meta := &CheckoutMeta{ Terms: fmt.Sprintf("https://%s/terms", host), Checkout: fmt.Sprintf("https://%s/checkout?order_id={checkout.order.id}", host), Confirmation: fmt.Sprintf("https://%s/confirmation/{checkout.order.id}", host), Validation: fmt.Sprintf("https://%s/validate", host), Push: fmt.Sprintf("https://%s/push?order_id={checkout.order.id}", host), - Country: getCountryFromHost(host), + Country: country, + Currency: getCurrency(country), + Locale: getLocale(country), } // Get current grain state (may be local or remote) - grain, err := s.pool.Get(id) + grain, err := s.pool.Get(uint64(id)) if err != nil { return nil, err } @@ -246,7 +252,7 @@ func (s *PoolServer) CreateOrUpdateCheckout(host string, id CartId) (*CheckoutOr func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id CartId) (*CartGrain, error) { // Persist initialization state via mutation (best-effort) - return s.pool.Apply(id, &messages.InitializeCheckout{ + return s.pool.Apply(uint64(id), &messages.InitializeCheckout{ OrderId: klarnaOrder.ID, Status: klarnaOrder.Status, PaymentInProgress: true, @@ -265,8 +271,8 @@ func (s *PoolServer) HandleCheckout(w http.ResponseWriter, r *http.Request, id C return json.NewEncoder(w).Encode(klarnaOrder) } -func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) error { - return func(w http.ResponseWriter, r *http.Request) error { +func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { var id CartId cookie, err := r.Cookie("cartid") if err != nil || cookie.Value == "" { @@ -300,7 +306,13 @@ func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.R } } - return fn(id, w, r) + err = fn(id, w, r) + if err != nil { + log.Printf("Server error, not remote error: %v\n", err) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + } + } } @@ -321,33 +333,41 @@ func (s *PoolServer) RemoveCartCookie(w http.ResponseWriter, r *http.Request, ca return nil } -func CartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) error { - return func(w http.ResponseWriter, r *http.Request) error { +func CartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + var id CartId raw := r.PathValue("id") // If no id supplied, generate a new one if raw == "" { id := MustNewCartId() w.Header().Set("Set-Cart-Id", id.String()) - - return fn(id, w, r) - } - // Parse base62 cart id - id, ok := ParseCartId(raw) - if !ok { - return fmt.Errorf("invalid cart id format") + } else { + // Parse base62 cart id + if parsedId, ok := ParseCartId(raw); !ok { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("cart id is invalid")) + return + } else { + id = parsedId + } } - return fn(id, w, r) + err := fn(id, w, r) + if err != nil { + log.Printf("Server error, not remote error: %v\n", err) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + } } } func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId) error) func(cartId CartId, w http.ResponseWriter, r *http.Request) error { return func(cartId CartId, w http.ResponseWriter, r *http.Request) error { - if ownerHost, ok := s.pool.OwnerHost(cartId); ok { - handled, err := ownerHost.Proxy(cartId, w, r) + if ownerHost, ok := s.pool.OwnerHost(uint64(cartId)); ok { + handled, err := ownerHost.Proxy(uint64(cartId), w, r) if err != nil { log.Printf("proxy failed: %v, taking ownership", err) - s.pool.TakeOwnership(cartId) + s.pool.TakeOwnership(uint64(cartId)) } else if handled { return nil } @@ -369,29 +389,29 @@ func (s *PoolServer) Serve() *http.ServeMux { w.WriteHeader(http.StatusOK) }) - mux.HandleFunc("GET /", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleGet)))) - mux.HandleFunc("GET /add/{sku}", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleAddSku)))) - mux.HandleFunc("POST /", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleAddRequest)))) - mux.HandleFunc("POST /set", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleSetCartItems)))) - mux.HandleFunc("DELETE /{itemId}", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleDeleteItem)))) - mux.HandleFunc("PUT /", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleQuantityChange)))) - mux.HandleFunc("DELETE /", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.RemoveCartCookie)))) - mux.HandleFunc("POST /delivery", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleSetDelivery)))) - mux.HandleFunc("DELETE /delivery/{deliveryId}", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleRemoveDelivery)))) - mux.HandleFunc("PUT /delivery/{deliveryId}/pickupPoint", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleSetPickupPoint)))) - mux.HandleFunc("GET /checkout", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleCheckout)))) - mux.HandleFunc("GET /confirmation/{orderId}", ErrorHandler(CookieCartIdHandler(s.ProxyHandler(s.HandleConfirmation)))) + mux.HandleFunc("GET /", CookieCartIdHandler(s.ProxyHandler(s.HandleGet))) + mux.HandleFunc("GET /add/{sku}", CookieCartIdHandler(s.ProxyHandler(s.HandleAddSku))) + mux.HandleFunc("POST /", CookieCartIdHandler(s.ProxyHandler(s.HandleAddRequest))) + mux.HandleFunc("POST /set", CookieCartIdHandler(s.ProxyHandler(s.HandleSetCartItems))) + mux.HandleFunc("DELETE /{itemId}", CookieCartIdHandler(s.ProxyHandler(s.HandleDeleteItem))) + mux.HandleFunc("PUT /", CookieCartIdHandler(s.ProxyHandler(s.HandleQuantityChange))) + mux.HandleFunc("DELETE /", CookieCartIdHandler(s.ProxyHandler(s.RemoveCartCookie))) + mux.HandleFunc("POST /delivery", CookieCartIdHandler(s.ProxyHandler(s.HandleSetDelivery))) + mux.HandleFunc("DELETE /delivery/{deliveryId}", CookieCartIdHandler(s.ProxyHandler(s.HandleRemoveDelivery))) + mux.HandleFunc("PUT /delivery/{deliveryId}/pickupPoint", CookieCartIdHandler(s.ProxyHandler(s.HandleSetPickupPoint))) + mux.HandleFunc("GET /checkout", CookieCartIdHandler(s.ProxyHandler(s.HandleCheckout))) + mux.HandleFunc("GET /confirmation/{orderId}", CookieCartIdHandler(s.ProxyHandler(s.HandleConfirmation))) - mux.HandleFunc("GET /byid/{id}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleGet)))) - mux.HandleFunc("GET /byid/{id}/add/{sku}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleAddSku)))) - mux.HandleFunc("POST /byid/{id}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleAddRequest)))) - mux.HandleFunc("DELETE /byid/{id}/{itemId}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleDeleteItem)))) - mux.HandleFunc("PUT /byid/{id}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleQuantityChange)))) - mux.HandleFunc("POST /byid/{id}/delivery", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleSetDelivery)))) - mux.HandleFunc("DELETE /byid/{id}/delivery/{deliveryId}", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleRemoveDelivery)))) - mux.HandleFunc("PUT /byid/{id}/delivery/{deliveryId}/pickupPoint", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleSetPickupPoint)))) - mux.HandleFunc("GET /byid/{id}/checkout", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleCheckout)))) - mux.HandleFunc("GET /byid/{id}/confirmation", ErrorHandler(CartIdHandler(s.ProxyHandler(s.HandleConfirmation)))) + mux.HandleFunc("GET /byid/{id}", CartIdHandler(s.ProxyHandler(s.HandleGet))) + mux.HandleFunc("GET /byid/{id}/add/{sku}", CartIdHandler(s.ProxyHandler(s.HandleAddSku))) + mux.HandleFunc("POST /byid/{id}", CartIdHandler(s.ProxyHandler(s.HandleAddRequest))) + mux.HandleFunc("DELETE /byid/{id}/{itemId}", CartIdHandler(s.ProxyHandler(s.HandleDeleteItem))) + mux.HandleFunc("PUT /byid/{id}", CartIdHandler(s.ProxyHandler(s.HandleQuantityChange))) + mux.HandleFunc("POST /byid/{id}/delivery", CartIdHandler(s.ProxyHandler(s.HandleSetDelivery))) + mux.HandleFunc("DELETE /byid/{id}/delivery/{deliveryId}", CartIdHandler(s.ProxyHandler(s.HandleRemoveDelivery))) + mux.HandleFunc("PUT /byid/{id}/delivery/{deliveryId}/pickupPoint", CartIdHandler(s.ProxyHandler(s.HandleSetPickupPoint))) + mux.HandleFunc("GET /byid/{id}/checkout", CartIdHandler(s.ProxyHandler(s.HandleCheckout))) + mux.HandleFunc("GET /byid/{id}/confirmation", CartIdHandler(s.ProxyHandler(s.HandleConfirmation))) return mux } diff --git a/proto/control_plane.proto b/proto/control_plane.proto index 316e46b..2649d94 100644 --- a/proto/control_plane.proto +++ b/proto/control_plane.proto @@ -37,8 +37,8 @@ message NegotiateReply { } // CartIdsReply returns the list of cart IDs (string form) currently owned locally. -message CartIdsReply { - repeated uint64 cart_ids = 1; +message ActorIdsReply { + repeated uint64 ids = 1; } // OwnerChangeAck retained as response type for Closing RPC (ConfirmOwner removed). @@ -56,13 +56,13 @@ message ClosingNotice { // First claim wins; receivers SHOULD NOT overwrite an existing different owner. message OwnershipAnnounce { string host = 1; // announcing host - repeated uint64 cart_ids = 2; // newly claimed cart ids + repeated uint64 ids = 2; // newly claimed cart ids } // ExpiryAnnounce broadcasts that a host evicted the provided cart IDs. message ExpiryAnnounce { string host = 1; - repeated uint64 cart_ids = 2; + repeated uint64 ids = 2; } // ControlPlane defines cluster coordination and ownership operations. @@ -74,7 +74,7 @@ service ControlPlane { rpc Negotiate(NegotiateRequest) returns (NegotiateReply); // GetCartIds lists currently owned cart IDs on this node. - rpc GetCartIds(Empty) returns (CartIdsReply); + rpc GetLocalActorIds(Empty) returns (ActorIdsReply); // ConfirmOwner RPC removed (was legacy ownership acknowledgement; ring-based ownership now authoritative)