289 lines
9.1 KiB
Go
289 lines
9.1 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
)
|
|
|
|
/*
|
|
event_log.go
|
|
|
|
Append-only cart event log (per cart id) with replay + metrics.
|
|
|
|
Rationale:
|
|
- Enables recovery of in-memory cart state after process restarts or TTL eviction.
|
|
- Provides a chronological mutation trail for auditing / debugging.
|
|
- Avoids write amplification of full snapshots on every mutation.
|
|
|
|
Format:
|
|
One JSON object per line:
|
|
{
|
|
"ts": 1700000000,
|
|
"type": "AddRequest",
|
|
"payload": { ... mutation fields ... }
|
|
}
|
|
|
|
Concurrency:
|
|
- Appends: synchronized per-cart via an in-process mutex map to avoid partial writes.
|
|
- Replay: sequential read of entire file; mutations applied in order.
|
|
|
|
Usage Integration (to be wired by caller):
|
|
1. After successful mutation application (non-replay), invoke:
|
|
AppendCartEvent(grain.GetId(), mutation)
|
|
2. During grain spawn, call:
|
|
ReplayCartEvents(grain, grain.GetId())
|
|
BEFORE serving requests, so state is reconstructed.
|
|
|
|
Metrics:
|
|
- cart_event_log_appends_total
|
|
- cart_event_log_replay_total
|
|
- cart_event_log_replay_failures_total
|
|
- cart_event_log_bytes_written_total
|
|
- cart_event_log_files_existing (gauge)
|
|
- cart_event_log_last_append_unix (gauge)
|
|
- cart_event_log_replay_duration_seconds (histogram)
|
|
|
|
Rotation / Compaction:
|
|
- Not implemented. If needed, implement size checks and snapshot+truncate later.
|
|
|
|
Caveats:
|
|
- Mutation schema changes may break replay unless backward-compatible.
|
|
- Missing / unknown event types are skipped (metric incremented).
|
|
- If a mutation fails during replay, replay continues (logged + metric).
|
|
|
|
*/
|
|
|
|
var (
|
|
eventLogDir = "data"
|
|
|
|
eventAppendsTotal = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_event_log_appends_total",
|
|
Help: "Total number of cart mutation events appended to event logs.",
|
|
})
|
|
eventReplayTotal = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_event_log_replay_total",
|
|
Help: "Total number of successful event log replays (per cart).",
|
|
})
|
|
eventReplayFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_event_log_replay_failures_total",
|
|
Help: "Total number of failed event log replay operations.",
|
|
})
|
|
eventBytesWrittenTotal = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_event_log_bytes_written_total",
|
|
Help: "Cumulative number of bytes written to all cart event logs.",
|
|
})
|
|
eventFilesExisting = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_event_log_files_existing",
|
|
Help: "Number of cart event log files currently existing on disk.",
|
|
})
|
|
eventLastAppendUnix = promauto.NewGauge(prometheus.GaugeOpts{
|
|
Name: "cart_event_log_last_append_unix",
|
|
Help: "Unix timestamp of the last append to any cart event log.",
|
|
})
|
|
eventReplayDuration = promauto.NewHistogram(prometheus.HistogramOpts{
|
|
Name: "cart_event_log_replay_duration_seconds",
|
|
Help: "Duration of replay operations per cart in seconds.",
|
|
Buckets: prometheus.DefBuckets,
|
|
})
|
|
eventUnknownTypesTotal = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_event_log_unknown_types_total",
|
|
Help: "Total number of unknown event types encountered during replay (skipped).",
|
|
})
|
|
eventMutationErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "cart_event_log_mutation_errors_total",
|
|
Help: "Total number of errors applying mutation events during replay.",
|
|
})
|
|
)
|
|
|
|
type cartEventRecord struct {
|
|
Timestamp int64 `json:"ts"`
|
|
Type string `json:"type"`
|
|
Payload interface{} `json:"payload"`
|
|
}
|
|
|
|
// registry of supported mutation payload type constructors
|
|
var eventTypeFactories = map[string]func() interface{}{
|
|
"AddRequest": func() interface{} { return &messages.AddRequest{} },
|
|
"AddItem": func() interface{} { return &messages.AddItem{} },
|
|
"RemoveItem": func() interface{} { return &messages.RemoveItem{} },
|
|
"RemoveDelivery": func() interface{} { return &messages.RemoveDelivery{} },
|
|
"ChangeQuantity": func() interface{} { return &messages.ChangeQuantity{} },
|
|
"SetDelivery": func() interface{} { return &messages.SetDelivery{} },
|
|
"SetPickupPoint": func() interface{} { return &messages.SetPickupPoint{} },
|
|
"SetCartRequest": func() interface{} { return &messages.SetCartRequest{} },
|
|
"OrderCreated": func() interface{} { return &messages.OrderCreated{} },
|
|
"InitializeCheckout": func() interface{} { return &messages.InitializeCheckout{} },
|
|
}
|
|
|
|
// Per-cart mutexes to serialize append operations (avoid partial overlapping writes)
|
|
var (
|
|
eventLogMu sync.Map // map[string]*sync.Mutex
|
|
)
|
|
|
|
// getCartEventMutex returns a mutex for a specific cart id string.
|
|
func getCartEventMutex(id string) *sync.Mutex {
|
|
if v, ok := eventLogMu.Load(id); ok {
|
|
return v.(*sync.Mutex)
|
|
}
|
|
m := &sync.Mutex{}
|
|
actual, _ := eventLogMu.LoadOrStore(id, m)
|
|
return actual.(*sync.Mutex)
|
|
}
|
|
|
|
// EventLogPath returns the path to the cart's event log file.
|
|
func EventLogPath(id CartId) string {
|
|
return filepath.Join(eventLogDir, fmt.Sprintf("%s.events.log", id.String()))
|
|
}
|
|
|
|
// EnsureEventLogDirectory ensures base directory exists and updates gauge.
|
|
func EnsureEventLogDirectory() error {
|
|
if _, err := os.Stat(eventLogDir); errors.Is(err, os.ErrNotExist) {
|
|
if err2 := os.MkdirAll(eventLogDir, 0755); err2 != nil {
|
|
return err2
|
|
}
|
|
}
|
|
// Update files existing gauge (approximate; counts matching *.events.log)
|
|
pattern := filepath.Join(eventLogDir, "*.events.log")
|
|
matches, _ := filepath.Glob(pattern)
|
|
eventFilesExisting.Set(float64(len(matches)))
|
|
return nil
|
|
}
|
|
|
|
// AppendCartEvent appends a mutation event to the cart's log (JSON line).
|
|
func AppendCartEvent(id CartId, mutation interface{}) error {
|
|
if mutation == nil {
|
|
return errors.New("nil mutation cannot be logged")
|
|
}
|
|
if err := EnsureEventLogDirectory(); err != nil {
|
|
return err
|
|
}
|
|
|
|
typ := mutationTypeName(mutation)
|
|
rec := cartEventRecord{
|
|
Timestamp: time.Now().Unix(),
|
|
Type: typ,
|
|
Payload: mutation,
|
|
}
|
|
lineBytes, err := json.Marshal(rec)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal event: %w", err)
|
|
}
|
|
lineBytes = append(lineBytes, '\n')
|
|
|
|
path := EventLogPath(id)
|
|
mtx := getCartEventMutex(id.String())
|
|
mtx.Lock()
|
|
defer mtx.Unlock()
|
|
|
|
fh, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
return fmt.Errorf("open event log: %w", err)
|
|
}
|
|
defer fh.Close()
|
|
|
|
n, werr := fh.Write(lineBytes)
|
|
if werr != nil {
|
|
return fmt.Errorf("write event log: %w", werr)
|
|
}
|
|
|
|
eventAppendsTotal.Inc()
|
|
eventBytesWrittenTotal.Add(float64(n))
|
|
eventLastAppendUnix.Set(float64(rec.Timestamp))
|
|
return nil
|
|
}
|
|
|
|
// ReplayCartEvents replays an existing cart's event log into the provided grain.
|
|
// It applies mutation payloads in order, skipping unknown types.
|
|
func ReplayCartEvents(grain *CartGrain, id CartId) error {
|
|
start := time.Now()
|
|
path := EventLogPath(id)
|
|
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
|
// No log -> nothing to replay
|
|
return nil
|
|
}
|
|
|
|
fh, err := os.Open(path)
|
|
if err != nil {
|
|
eventReplayFailuresTotal.Inc()
|
|
return fmt.Errorf("open replay file: %w", err)
|
|
}
|
|
defer fh.Close()
|
|
|
|
scanner := bufio.NewScanner(fh)
|
|
// Increase buffer in case of large payloads
|
|
const maxLine = 256 * 1024
|
|
buf := make([]byte, 0, 64*1024)
|
|
scanner.Buffer(buf, maxLine)
|
|
|
|
for scanner.Scan() {
|
|
line := scanner.Bytes()
|
|
var raw struct {
|
|
Timestamp time.Time `json:"ts"`
|
|
Type string `json:"type"`
|
|
Payload json.RawMessage `json:"payload"`
|
|
}
|
|
if err := json.Unmarshal(line, &raw); err != nil {
|
|
eventReplayFailuresTotal.Inc()
|
|
continue // skip malformed line
|
|
}
|
|
factory, ok := eventTypeFactories[raw.Type]
|
|
if !ok {
|
|
eventUnknownTypesTotal.Inc()
|
|
continue // skip unknown mutation type
|
|
}
|
|
instance := factory()
|
|
if err := json.Unmarshal(raw.Payload, instance); err != nil {
|
|
eventMutationErrorsTotal.Inc()
|
|
continue
|
|
}
|
|
// Apply mutation directly using internal registration (bypass AppendCartEvent recursion).
|
|
if _, applyErr := ApplyRegistered(grain, instance); applyErr != nil {
|
|
eventMutationErrorsTotal.Inc()
|
|
continue
|
|
} else {
|
|
// Update lastChange to the timestamp of this event (sliding inactivity window support).
|
|
grain.lastChange = raw.Timestamp
|
|
}
|
|
}
|
|
if serr := scanner.Err(); serr != nil {
|
|
eventReplayFailuresTotal.Inc()
|
|
return fmt.Errorf("scanner error: %w", serr)
|
|
}
|
|
|
|
eventReplayTotal.Inc()
|
|
eventReplayDuration.Observe(time.Since(start).Seconds())
|
|
return nil
|
|
}
|
|
|
|
// mutationTypeName returns the short struct name for a mutation (pointer aware).
|
|
func mutationTypeName(v interface{}) string {
|
|
if v == nil {
|
|
return "nil"
|
|
}
|
|
t := reflect.TypeOf(v)
|
|
if t.Kind() == reflect.Ptr {
|
|
t = t.Elem()
|
|
}
|
|
return t.Name()
|
|
}
|
|
|
|
/*
|
|
Future enhancements:
|
|
- Compression: gzip large (> N events) logs to reduce disk usage.
|
|
- Compaction: periodic snapshot + truncate old events to bound replay latency.
|
|
- Checkpoint events: inject cart state snapshots every M mutations.
|
|
- Integrity: add checksum per line for corruption detection.
|
|
- Multi-writer safety across processes (currently only safe within one process).
|
|
*/
|