diff --git a/cart-grain.go b/cart-grain.go index 7f7dd99..8b39bde 100644 --- a/cart-grain.go +++ b/cart-grain.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "sync" + "time" messages "git.tornberg.me/go-cart-actor/proto" ) @@ -89,6 +90,7 @@ type CartGrain struct { mu sync.RWMutex lastItemId int lastDeliveryId int + lastChange int64 // unix seconds of last successful mutation (replay sets from event ts) Id CartId `json:"id"` Items []*CartItem `json:"items"` TotalPrice int64 `json:"totalPrice"` @@ -112,8 +114,7 @@ func (c *CartGrain) GetId() CartId { } func (c *CartGrain) GetLastChange() int64 { - // Legacy event log removed; return 0 to indicate no persisted mutation history. - return 0 + return c.lastChange } func (c *CartGrain) GetCurrentState() (*CartGrain, error) { @@ -269,6 +270,13 @@ func (c *CartGrain) Apply(content interface{}, isReplay bool) (*CartGrain, error } return nil, err } + + // Sliding TTL: update lastChange only for non-replay successful mutations. + if updated != nil && !isReplay { + c.lastChange = time.Now().Unix() + _ = AppendCartEvent(c.Id, content) + } + return updated, nil } diff --git a/deployment/deployment.yaml b/deployment/deployment.yaml index bd232c2..a6bd2ec 100644 --- a/deployment/deployment.yaml +++ b/deployment/deployment.yaml @@ -228,10 +228,10 @@ metadata: name: cart-ingress annotations: cert-manager.io/cluster-issuer: letsencrypt-prod - # nginx.ingress.kubernetes.io/affinity: "cookie" - # nginx.ingress.kubernetes.io/session-cookie-name: "cart-affinity" - # nginx.ingress.kubernetes.io/session-cookie-expires: "172800" - # nginx.ingress.kubernetes.io/session-cookie-max-age: "172800" + nginx.ingress.kubernetes.io/affinity: "cookie" + nginx.ingress.kubernetes.io/session-cookie-name: "cart-affinity" + nginx.ingress.kubernetes.io/session-cookie-expires: "172800" + nginx.ingress.kubernetes.io/session-cookie-max-age: "172800" nginx.ingress.kubernetes.io/proxy-body-size: 4m spec: ingressClassName: nginx diff --git a/event_log.go b/event_log.go new file mode 100644 index 0000000..0481459 --- /dev/null +++ b/event_log.go @@ -0,0 +1,288 @@ +package main + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "reflect" + "sync" + "time" + + messages "git.tornberg.me/go-cart-actor/proto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +/* +event_log.go + +Append-only cart event log (per cart id) with replay + metrics. + +Rationale: + - Enables recovery of in-memory cart state after process restarts or TTL eviction. + - Provides a chronological mutation trail for auditing / debugging. + - Avoids write amplification of full snapshots on every mutation. + +Format: + One JSON object per line: + { + "ts": 1700000000, + "type": "AddRequest", + "payload": { ... mutation fields ... } + } + +Concurrency: + - Appends: synchronized per-cart via an in-process mutex map to avoid partial writes. + - Replay: sequential read of entire file; mutations applied in order. + +Usage Integration (to be wired by caller): + 1. After successful mutation application (non-replay), invoke: + AppendCartEvent(grain.GetId(), mutation) + 2. During grain spawn, call: + ReplayCartEvents(grain, grain.GetId()) + BEFORE serving requests, so state is reconstructed. + +Metrics: + - cart_event_log_appends_total + - cart_event_log_replay_total + - cart_event_log_replay_failures_total + - cart_event_log_bytes_written_total + - cart_event_log_files_existing (gauge) + - cart_event_log_last_append_unix (gauge) + - cart_event_log_replay_duration_seconds (histogram) + +Rotation / Compaction: + - Not implemented. If needed, implement size checks and snapshot+truncate later. + +Caveats: + - Mutation schema changes may break replay unless backward-compatible. + - Missing / unknown event types are skipped (metric incremented). + - If a mutation fails during replay, replay continues (logged + metric). + +*/ + +var ( + eventLogDir = "data" + + eventAppendsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_event_log_appends_total", + Help: "Total number of cart mutation events appended to event logs.", + }) + eventReplayTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_event_log_replay_total", + Help: "Total number of successful event log replays (per cart).", + }) + eventReplayFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_event_log_replay_failures_total", + Help: "Total number of failed event log replay operations.", + }) + eventBytesWrittenTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_event_log_bytes_written_total", + Help: "Cumulative number of bytes written to all cart event logs.", + }) + eventFilesExisting = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cart_event_log_files_existing", + Help: "Number of cart event log files currently existing on disk.", + }) + eventLastAppendUnix = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cart_event_log_last_append_unix", + Help: "Unix timestamp of the last append to any cart event log.", + }) + eventReplayDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "cart_event_log_replay_duration_seconds", + Help: "Duration of replay operations per cart in seconds.", + Buckets: prometheus.DefBuckets, + }) + eventUnknownTypesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_event_log_unknown_types_total", + Help: "Total number of unknown event types encountered during replay (skipped).", + }) + eventMutationErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cart_event_log_mutation_errors_total", + Help: "Total number of errors applying mutation events during replay.", + }) +) + +type cartEventRecord struct { + Timestamp int64 `json:"ts"` + Type string `json:"type"` + Payload interface{} `json:"payload"` +} + +// registry of supported mutation payload type constructors +var eventTypeFactories = map[string]func() interface{}{ + "AddRequest": func() interface{} { return &messages.AddRequest{} }, + "AddItem": func() interface{} { return &messages.AddItem{} }, + "RemoveItem": func() interface{} { return &messages.RemoveItem{} }, + "RemoveDelivery": func() interface{} { return &messages.RemoveDelivery{} }, + "ChangeQuantity": func() interface{} { return &messages.ChangeQuantity{} }, + "SetDelivery": func() interface{} { return &messages.SetDelivery{} }, + "SetPickupPoint": func() interface{} { return &messages.SetPickupPoint{} }, + "SetCartRequest": func() interface{} { return &messages.SetCartRequest{} }, + "OrderCreated": func() interface{} { return &messages.OrderCreated{} }, + "InitializeCheckout": func() interface{} { return &messages.InitializeCheckout{} }, +} + +// Per-cart mutexes to serialize append operations (avoid partial overlapping writes) +var ( + eventLogMu sync.Map // map[string]*sync.Mutex +) + +// getCartEventMutex returns a mutex for a specific cart id string. +func getCartEventMutex(id string) *sync.Mutex { + if v, ok := eventLogMu.Load(id); ok { + return v.(*sync.Mutex) + } + m := &sync.Mutex{} + actual, _ := eventLogMu.LoadOrStore(id, m) + return actual.(*sync.Mutex) +} + +// EventLogPath returns the path to the cart's event log file. +func EventLogPath(id CartId) string { + return filepath.Join(eventLogDir, fmt.Sprintf("%s.events.log", id.String())) +} + +// EnsureEventLogDirectory ensures base directory exists and updates gauge. +func EnsureEventLogDirectory() error { + if _, err := os.Stat(eventLogDir); errors.Is(err, os.ErrNotExist) { + if err2 := os.MkdirAll(eventLogDir, 0755); err2 != nil { + return err2 + } + } + // Update files existing gauge (approximate; counts matching *.events.log) + pattern := filepath.Join(eventLogDir, "*.events.log") + matches, _ := filepath.Glob(pattern) + eventFilesExisting.Set(float64(len(matches))) + return nil +} + +// AppendCartEvent appends a mutation event to the cart's log (JSON line). +func AppendCartEvent(id CartId, mutation interface{}) error { + if mutation == nil { + return errors.New("nil mutation cannot be logged") + } + if err := EnsureEventLogDirectory(); err != nil { + return err + } + + typ := mutationTypeName(mutation) + rec := cartEventRecord{ + Timestamp: time.Now().Unix(), + Type: typ, + Payload: mutation, + } + lineBytes, err := json.Marshal(rec) + if err != nil { + return fmt.Errorf("marshal event: %w", err) + } + lineBytes = append(lineBytes, '\n') + + path := EventLogPath(id) + mtx := getCartEventMutex(id.String()) + mtx.Lock() + defer mtx.Unlock() + + fh, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("open event log: %w", err) + } + defer fh.Close() + + n, werr := fh.Write(lineBytes) + if werr != nil { + return fmt.Errorf("write event log: %w", werr) + } + + eventAppendsTotal.Inc() + eventBytesWrittenTotal.Add(float64(n)) + eventLastAppendUnix.Set(float64(rec.Timestamp)) + return nil +} + +// ReplayCartEvents replays an existing cart's event log into the provided grain. +// It applies mutation payloads in order, skipping unknown types. +func ReplayCartEvents(grain *CartGrain, id CartId) error { + start := time.Now() + path := EventLogPath(id) + if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { + // No log -> nothing to replay + return nil + } + + fh, err := os.Open(path) + if err != nil { + eventReplayFailuresTotal.Inc() + return fmt.Errorf("open replay file: %w", err) + } + defer fh.Close() + + scanner := bufio.NewScanner(fh) + // Increase buffer in case of large payloads + const maxLine = 256 * 1024 + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, maxLine) + + for scanner.Scan() { + line := scanner.Bytes() + var raw struct { + Timestamp int64 `json:"ts"` + Type string `json:"type"` + Payload json.RawMessage `json:"payload"` + } + if err := json.Unmarshal(line, &raw); err != nil { + eventReplayFailuresTotal.Inc() + continue // skip malformed line + } + factory, ok := eventTypeFactories[raw.Type] + if !ok { + eventUnknownTypesTotal.Inc() + continue // skip unknown mutation type + } + instance := factory() + if err := json.Unmarshal(raw.Payload, instance); err != nil { + eventMutationErrorsTotal.Inc() + continue + } + // Apply mutation directly using internal registration (bypass AppendCartEvent recursion). + if _, applyErr := ApplyRegistered(grain, instance); applyErr != nil { + eventMutationErrorsTotal.Inc() + continue + } else { + // Update lastChange to the timestamp of this event (sliding inactivity window support). + grain.lastChange = raw.Timestamp + } + } + if serr := scanner.Err(); serr != nil { + eventReplayFailuresTotal.Inc() + return fmt.Errorf("scanner error: %w", serr) + } + + eventReplayTotal.Inc() + eventReplayDuration.Observe(time.Since(start).Seconds()) + return nil +} + +// mutationTypeName returns the short struct name for a mutation (pointer aware). +func mutationTypeName(v interface{}) string { + if v == nil { + return "nil" + } + t := reflect.TypeOf(v) + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t.Name() +} + +/* +Future enhancements: + - Compression: gzip large (> N events) logs to reduce disk usage. + - Compaction: periodic snapshot + truncate old events to bound replay latency. + - Checkpoint events: inject cart state snapshots every M mutations. + - Integrity: add checksum per line for corruption detection. + - Multi-writer safety across processes (currently only safe within one process). +*/ diff --git a/grain-pool.go b/grain-pool.go index 55b4806..4009a17 100644 --- a/grain-pool.go +++ b/grain-pool.go @@ -157,6 +157,20 @@ func (p *GrainLocalPool) Purge() { } } +// RefreshExpiry resets the expiry timestamp for a living grain to now + TTL. +// Called after successful mutations to implement a sliding inactivity window. +func (p *GrainLocalPool) RefreshExpiry(id CartId) { + p.mu.Lock() + defer p.mu.Unlock() + for i := range p.expiry { + g := p.expiry[i].Grain + if g != nil && g.Id == id { + p.expiry[i].Expires = time.Now().Add(p.Ttl) + break + } + } +} + // GetGrains returns a legacy view of grains (copy) for compatibility. func (p *GrainLocalPool) GetGrains() map[CartId]*CartGrain { p.mu.RLock() @@ -225,7 +239,12 @@ func (p *GrainLocalPool) Apply(id CartId, mutation interface{}) (*CartGrain, err if err != nil || grain == nil { return nil, err } - return grain.Apply(mutation, false) + result, applyErr := grain.Apply(mutation, false) + // Sliding TTL: refresh expiry on successful non-replay mutation (Apply always non-replay here) + if applyErr == nil && result != nil { + p.RefreshExpiry(id) + } + return result, applyErr } // Get returns current state (legacy wrapper). diff --git a/main.go b/main.go index f7b6a36..0517fc9 100644 --- a/main.go +++ b/main.go @@ -43,11 +43,16 @@ func spawn(id CartId) (*CartGrain, error) { Deliveries: []*CartDelivery{}, Id: id, Items: []*CartItem{}, - // storageMessages removed (legacy event log deprecated) - TotalPrice: 0, + TotalPrice: 0, } - err := loadMessages(ret, id) - return ret, err + // Set baseline lastChange at spawn; replay may update it to last event timestamp. + ret.lastChange = time.Now().Unix() + + // Legacy loadMessages (no-op) retained; then replay append-only event log + _ = loadMessages(ret, id) + _ = ReplayCartEvents(ret, id) + + return ret, nil } func init() { @@ -160,7 +165,7 @@ func main() { log.Printf("Error loading state: %v\n", err) } app := &App{ - pool: NewGrainLocalPool(65535, 5*time.Minute, spawn), + pool: NewGrainLocalPool(65535, 2*time.Hour, spawn), storage: storage, } @@ -384,11 +389,15 @@ func main() { } func triggerOrderCompleted(err error, syncedServer *PoolServer, order *CheckoutOrder) error { - _, err = syncedServer.pool.Apply(ToCartId(order.MerchantReference1), &messages.OrderCreated{ + mutation := &messages.OrderCreated{ OrderId: order.ID, Status: order.Status, - }) - return err + } + _, applyErr := syncedServer.pool.Apply(ToCartId(order.MerchantReference1), mutation) + if applyErr == nil { + _ = AppendCartEvent(ToCartId(order.MerchantReference1), mutation) + } + return applyErr } func confirmOrder(order *CheckoutOrder, orderHandler *AmqpOrderHandler) error {