package main import ( "bufio" "encoding/json" "errors" "fmt" "log" "os" "path/filepath" "reflect" "sync" "time" "git.tornberg.me/go-cart-actor/pkg/actor" messages "git.tornberg.me/go-cart-actor/pkg/messages" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) /* event_log.go Append-only cart event log (per cart id) with replay + metrics. Rationale: - Enables recovery of in-memory cart state after process restarts or TTL eviction. - Provides a chronological mutation trail for auditing / debugging. - Avoids write amplification of full snapshots on every mutation. Format: One JSON object per line: { "ts": 1700000000, "type": "AddRequest", "payload": { ... mutation fields ... } } Concurrency: - Appends: synchronized per-cart via an in-process mutex map to avoid partial writes. - Replay: sequential read of entire file; mutations applied in order. Usage Integration (to be wired by caller): 1. After successful mutation application (non-replay), invoke: AppendCartEvent(grain.GetId(), mutation) 2. During grain spawn, call: ReplayCartEvents(grain, grain.GetId()) BEFORE serving requests, so state is reconstructed. Metrics: - cart_event_log_appends_total - cart_event_log_replay_total - cart_event_log_replay_failures_total - cart_event_log_bytes_written_total - cart_event_log_files_existing (gauge) - cart_event_log_last_append_unix (gauge) - cart_event_log_replay_duration_seconds (histogram) Rotation / Compaction: - Not implemented. If needed, implement size checks and snapshot+truncate later. Caveats: - Mutation schema changes may break replay unless backward-compatible. - Missing / unknown event types are skipped (metric incremented). - If a mutation fails during replay, replay continues (logged + metric). */ var ( eventLogDir = "data" eventAppendsTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_event_log_appends_total", Help: "Total number of cart mutation events appended to event logs.", }) eventReplayTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_event_log_replay_total", Help: "Total number of successful event log replays (per cart).", }) eventReplayFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_event_log_replay_failures_total", Help: "Total number of failed event log replay operations.", }) eventBytesWrittenTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_event_log_bytes_written_total", Help: "Cumulative number of bytes written to all cart event logs.", }) eventFilesExisting = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_event_log_files_existing", Help: "Number of cart event log files currently existing on disk.", }) eventLastAppendUnix = promauto.NewGauge(prometheus.GaugeOpts{ Name: "cart_event_log_last_append_unix", Help: "Unix timestamp of the last append to any cart event log.", }) eventReplayDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "cart_event_log_replay_duration_seconds", Help: "Duration of replay operations per cart in seconds.", Buckets: prometheus.DefBuckets, }) eventUnknownTypesTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_event_log_unknown_types_total", Help: "Total number of unknown event types encountered during replay (skipped).", }) eventMutationErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "cart_event_log_mutation_errors_total", Help: "Total number of errors applying mutation events during replay.", }) ) type cartEventRecord struct { Timestamp int64 `json:"ts"` Type string `json:"type"` Payload interface{} `json:"payload"` } // registry of supported mutation payload type constructors var eventTypeFactories = map[string]func() interface{}{ "AddRequest": func() interface{} { return &messages.AddRequest{} }, "AddItem": func() interface{} { return &messages.AddItem{} }, "RemoveItem": func() interface{} { return &messages.RemoveItem{} }, "RemoveDelivery": func() interface{} { return &messages.RemoveDelivery{} }, "ChangeQuantity": func() interface{} { return &messages.ChangeQuantity{} }, "SetDelivery": func() interface{} { return &messages.SetDelivery{} }, "SetPickupPoint": func() interface{} { return &messages.SetPickupPoint{} }, "SetCartRequest": func() interface{} { return &messages.SetCartRequest{} }, "OrderCreated": func() interface{} { return &messages.OrderCreated{} }, "InitializeCheckout": func() interface{} { return &messages.InitializeCheckout{} }, } // Per-cart mutexes to serialize append operations (avoid partial overlapping writes) var ( eventLogMu sync.Map // map[string]*sync.Mutex ) // getCartEventMutex returns a mutex for a specific cart id string. func getCartEventMutex(id string) *sync.Mutex { if v, ok := eventLogMu.Load(id); ok { return v.(*sync.Mutex) } m := &sync.Mutex{} actual, _ := eventLogMu.LoadOrStore(id, m) return actual.(*sync.Mutex) } // EventLogPath returns the path to the cart's event log file. func EventLogPath(id CartId) string { return filepath.Join(eventLogDir, fmt.Sprintf("%s.events.log", id.String())) } // EnsureEventLogDirectory ensures base directory exists and updates gauge. func EnsureEventLogDirectory() error { if _, err := os.Stat(eventLogDir); errors.Is(err, os.ErrNotExist) { if err2 := os.MkdirAll(eventLogDir, 0755); err2 != nil { return err2 } } // Update files existing gauge (approximate; counts matching *.events.log) pattern := filepath.Join(eventLogDir, "*.events.log") matches, _ := filepath.Glob(pattern) eventFilesExisting.Set(float64(len(matches))) return nil } // AppendCartEvent appends a mutation event to the cart's log (JSON line). func AppendCartEvent(id CartId, mutation interface{}) error { if mutation == nil { return errors.New("nil mutation cannot be logged") } if err := EnsureEventLogDirectory(); err != nil { return err } typ := mutationTypeName(mutation) rec := cartEventRecord{ Timestamp: time.Now().Unix(), Type: typ, Payload: mutation, } lineBytes, err := json.Marshal(rec) if err != nil { return fmt.Errorf("marshal event: %w", err) } lineBytes = append(lineBytes, '\n') path := EventLogPath(id) mtx := getCartEventMutex(id.String()) mtx.Lock() defer mtx.Unlock() fh, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) if err != nil { return fmt.Errorf("open event log: %w", err) } defer fh.Close() n, werr := fh.Write(lineBytes) if werr != nil { return fmt.Errorf("write event log: %w", werr) } eventAppendsTotal.Inc() eventBytesWrittenTotal.Add(float64(n)) eventLastAppendUnix.Set(float64(rec.Timestamp)) return nil } // ReplayCartEvents replays an existing cart's event log into the provided grain. // It applies mutation payloads in order, skipping unknown types. func ReplayCartEvents(grain *CartGrain, id CartId, registry actor.MutationRegistry) error { start := time.Now() path := EventLogPath(id) if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { // No log -> nothing to replay return nil } fh, err := os.Open(path) if err != nil { eventReplayFailuresTotal.Inc() return fmt.Errorf("open replay file: %w", err) } defer fh.Close() scanner := bufio.NewScanner(fh) // Increase buffer in case of large payloads const maxLine = 256 * 1024 buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, maxLine) for scanner.Scan() { line := scanner.Bytes() var raw struct { Timestamp time.Time `json:"ts"` Type string `json:"type"` Payload json.RawMessage `json:"payload"` } if err := json.Unmarshal(line, &raw); err != nil { eventReplayFailuresTotal.Inc() continue // skip malformed line } instance, ok := registry.Create(raw.Type) if !ok { log.Printf("loading failed for unknown mutation type: %s", raw.Type) eventReplayFailuresTotal.Inc() continue // skip unknown type } if err := json.Unmarshal(raw.Payload, instance); err != nil { eventMutationErrorsTotal.Inc() continue } // Apply mutation directly using internal registration (bypass AppendCartEvent recursion). if applyErr := registry.Apply(grain, instance); applyErr != nil { eventMutationErrorsTotal.Inc() continue } else { // Update lastChange to the timestamp of this event (sliding inactivity window support). grain.lastChange = raw.Timestamp } } if serr := scanner.Err(); serr != nil { eventReplayFailuresTotal.Inc() return fmt.Errorf("scanner error: %w", serr) } eventReplayTotal.Inc() eventReplayDuration.Observe(time.Since(start).Seconds()) return nil } // mutationTypeName returns the short struct name for a mutation (pointer aware). func mutationTypeName(v interface{}) string { if v == nil { return "nil" } t := reflect.TypeOf(v) if t.Kind() == reflect.Ptr { t = t.Elem() } return t.Name() } /* Future enhancements: - Compression: gzip large (> N events) logs to reduce disk usage. - Compaction: periodic snapshot + truncate old events to bound replay latency. - Checkpoint events: inject cart state snapshots every M mutations. - Integrity: add checksum per line for corruption detection. - Multi-writer safety across processes (currently only safe within one process). */