some strange storage stuff
This commit is contained in:
288
event_log.go
Normal file
288
event_log.go
Normal file
@@ -0,0 +1,288 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
messages "git.tornberg.me/go-cart-actor/proto"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
/*
|
||||
event_log.go
|
||||
|
||||
Append-only cart event log (per cart id) with replay + metrics.
|
||||
|
||||
Rationale:
|
||||
- Enables recovery of in-memory cart state after process restarts or TTL eviction.
|
||||
- Provides a chronological mutation trail for auditing / debugging.
|
||||
- Avoids write amplification of full snapshots on every mutation.
|
||||
|
||||
Format:
|
||||
One JSON object per line:
|
||||
{
|
||||
"ts": 1700000000,
|
||||
"type": "AddRequest",
|
||||
"payload": { ... mutation fields ... }
|
||||
}
|
||||
|
||||
Concurrency:
|
||||
- Appends: synchronized per-cart via an in-process mutex map to avoid partial writes.
|
||||
- Replay: sequential read of entire file; mutations applied in order.
|
||||
|
||||
Usage Integration (to be wired by caller):
|
||||
1. After successful mutation application (non-replay), invoke:
|
||||
AppendCartEvent(grain.GetId(), mutation)
|
||||
2. During grain spawn, call:
|
||||
ReplayCartEvents(grain, grain.GetId())
|
||||
BEFORE serving requests, so state is reconstructed.
|
||||
|
||||
Metrics:
|
||||
- cart_event_log_appends_total
|
||||
- cart_event_log_replay_total
|
||||
- cart_event_log_replay_failures_total
|
||||
- cart_event_log_bytes_written_total
|
||||
- cart_event_log_files_existing (gauge)
|
||||
- cart_event_log_last_append_unix (gauge)
|
||||
- cart_event_log_replay_duration_seconds (histogram)
|
||||
|
||||
Rotation / Compaction:
|
||||
- Not implemented. If needed, implement size checks and snapshot+truncate later.
|
||||
|
||||
Caveats:
|
||||
- Mutation schema changes may break replay unless backward-compatible.
|
||||
- Missing / unknown event types are skipped (metric incremented).
|
||||
- If a mutation fails during replay, replay continues (logged + metric).
|
||||
|
||||
*/
|
||||
|
||||
var (
|
||||
eventLogDir = "data"
|
||||
|
||||
eventAppendsTotal = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "cart_event_log_appends_total",
|
||||
Help: "Total number of cart mutation events appended to event logs.",
|
||||
})
|
||||
eventReplayTotal = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "cart_event_log_replay_total",
|
||||
Help: "Total number of successful event log replays (per cart).",
|
||||
})
|
||||
eventReplayFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "cart_event_log_replay_failures_total",
|
||||
Help: "Total number of failed event log replay operations.",
|
||||
})
|
||||
eventBytesWrittenTotal = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "cart_event_log_bytes_written_total",
|
||||
Help: "Cumulative number of bytes written to all cart event logs.",
|
||||
})
|
||||
eventFilesExisting = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "cart_event_log_files_existing",
|
||||
Help: "Number of cart event log files currently existing on disk.",
|
||||
})
|
||||
eventLastAppendUnix = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "cart_event_log_last_append_unix",
|
||||
Help: "Unix timestamp of the last append to any cart event log.",
|
||||
})
|
||||
eventReplayDuration = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "cart_event_log_replay_duration_seconds",
|
||||
Help: "Duration of replay operations per cart in seconds.",
|
||||
Buckets: prometheus.DefBuckets,
|
||||
})
|
||||
eventUnknownTypesTotal = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "cart_event_log_unknown_types_total",
|
||||
Help: "Total number of unknown event types encountered during replay (skipped).",
|
||||
})
|
||||
eventMutationErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "cart_event_log_mutation_errors_total",
|
||||
Help: "Total number of errors applying mutation events during replay.",
|
||||
})
|
||||
)
|
||||
|
||||
type cartEventRecord struct {
|
||||
Timestamp int64 `json:"ts"`
|
||||
Type string `json:"type"`
|
||||
Payload interface{} `json:"payload"`
|
||||
}
|
||||
|
||||
// registry of supported mutation payload type constructors
|
||||
var eventTypeFactories = map[string]func() interface{}{
|
||||
"AddRequest": func() interface{} { return &messages.AddRequest{} },
|
||||
"AddItem": func() interface{} { return &messages.AddItem{} },
|
||||
"RemoveItem": func() interface{} { return &messages.RemoveItem{} },
|
||||
"RemoveDelivery": func() interface{} { return &messages.RemoveDelivery{} },
|
||||
"ChangeQuantity": func() interface{} { return &messages.ChangeQuantity{} },
|
||||
"SetDelivery": func() interface{} { return &messages.SetDelivery{} },
|
||||
"SetPickupPoint": func() interface{} { return &messages.SetPickupPoint{} },
|
||||
"SetCartRequest": func() interface{} { return &messages.SetCartRequest{} },
|
||||
"OrderCreated": func() interface{} { return &messages.OrderCreated{} },
|
||||
"InitializeCheckout": func() interface{} { return &messages.InitializeCheckout{} },
|
||||
}
|
||||
|
||||
// Per-cart mutexes to serialize append operations (avoid partial overlapping writes)
|
||||
var (
|
||||
eventLogMu sync.Map // map[string]*sync.Mutex
|
||||
)
|
||||
|
||||
// getCartEventMutex returns a mutex for a specific cart id string.
|
||||
func getCartEventMutex(id string) *sync.Mutex {
|
||||
if v, ok := eventLogMu.Load(id); ok {
|
||||
return v.(*sync.Mutex)
|
||||
}
|
||||
m := &sync.Mutex{}
|
||||
actual, _ := eventLogMu.LoadOrStore(id, m)
|
||||
return actual.(*sync.Mutex)
|
||||
}
|
||||
|
||||
// EventLogPath returns the path to the cart's event log file.
|
||||
func EventLogPath(id CartId) string {
|
||||
return filepath.Join(eventLogDir, fmt.Sprintf("%s.events.log", id.String()))
|
||||
}
|
||||
|
||||
// EnsureEventLogDirectory ensures base directory exists and updates gauge.
|
||||
func EnsureEventLogDirectory() error {
|
||||
if _, err := os.Stat(eventLogDir); errors.Is(err, os.ErrNotExist) {
|
||||
if err2 := os.MkdirAll(eventLogDir, 0755); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
}
|
||||
// Update files existing gauge (approximate; counts matching *.events.log)
|
||||
pattern := filepath.Join(eventLogDir, "*.events.log")
|
||||
matches, _ := filepath.Glob(pattern)
|
||||
eventFilesExisting.Set(float64(len(matches)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// AppendCartEvent appends a mutation event to the cart's log (JSON line).
|
||||
func AppendCartEvent(id CartId, mutation interface{}) error {
|
||||
if mutation == nil {
|
||||
return errors.New("nil mutation cannot be logged")
|
||||
}
|
||||
if err := EnsureEventLogDirectory(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
typ := mutationTypeName(mutation)
|
||||
rec := cartEventRecord{
|
||||
Timestamp: time.Now().Unix(),
|
||||
Type: typ,
|
||||
Payload: mutation,
|
||||
}
|
||||
lineBytes, err := json.Marshal(rec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal event: %w", err)
|
||||
}
|
||||
lineBytes = append(lineBytes, '\n')
|
||||
|
||||
path := EventLogPath(id)
|
||||
mtx := getCartEventMutex(id.String())
|
||||
mtx.Lock()
|
||||
defer mtx.Unlock()
|
||||
|
||||
fh, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open event log: %w", err)
|
||||
}
|
||||
defer fh.Close()
|
||||
|
||||
n, werr := fh.Write(lineBytes)
|
||||
if werr != nil {
|
||||
return fmt.Errorf("write event log: %w", werr)
|
||||
}
|
||||
|
||||
eventAppendsTotal.Inc()
|
||||
eventBytesWrittenTotal.Add(float64(n))
|
||||
eventLastAppendUnix.Set(float64(rec.Timestamp))
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReplayCartEvents replays an existing cart's event log into the provided grain.
|
||||
// It applies mutation payloads in order, skipping unknown types.
|
||||
func ReplayCartEvents(grain *CartGrain, id CartId) error {
|
||||
start := time.Now()
|
||||
path := EventLogPath(id)
|
||||
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
||||
// No log -> nothing to replay
|
||||
return nil
|
||||
}
|
||||
|
||||
fh, err := os.Open(path)
|
||||
if err != nil {
|
||||
eventReplayFailuresTotal.Inc()
|
||||
return fmt.Errorf("open replay file: %w", err)
|
||||
}
|
||||
defer fh.Close()
|
||||
|
||||
scanner := bufio.NewScanner(fh)
|
||||
// Increase buffer in case of large payloads
|
||||
const maxLine = 256 * 1024
|
||||
buf := make([]byte, 0, 64*1024)
|
||||
scanner.Buffer(buf, maxLine)
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
var raw struct {
|
||||
Timestamp int64 `json:"ts"`
|
||||
Type string `json:"type"`
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
}
|
||||
if err := json.Unmarshal(line, &raw); err != nil {
|
||||
eventReplayFailuresTotal.Inc()
|
||||
continue // skip malformed line
|
||||
}
|
||||
factory, ok := eventTypeFactories[raw.Type]
|
||||
if !ok {
|
||||
eventUnknownTypesTotal.Inc()
|
||||
continue // skip unknown mutation type
|
||||
}
|
||||
instance := factory()
|
||||
if err := json.Unmarshal(raw.Payload, instance); err != nil {
|
||||
eventMutationErrorsTotal.Inc()
|
||||
continue
|
||||
}
|
||||
// Apply mutation directly using internal registration (bypass AppendCartEvent recursion).
|
||||
if _, applyErr := ApplyRegistered(grain, instance); applyErr != nil {
|
||||
eventMutationErrorsTotal.Inc()
|
||||
continue
|
||||
} else {
|
||||
// Update lastChange to the timestamp of this event (sliding inactivity window support).
|
||||
grain.lastChange = raw.Timestamp
|
||||
}
|
||||
}
|
||||
if serr := scanner.Err(); serr != nil {
|
||||
eventReplayFailuresTotal.Inc()
|
||||
return fmt.Errorf("scanner error: %w", serr)
|
||||
}
|
||||
|
||||
eventReplayTotal.Inc()
|
||||
eventReplayDuration.Observe(time.Since(start).Seconds())
|
||||
return nil
|
||||
}
|
||||
|
||||
// mutationTypeName returns the short struct name for a mutation (pointer aware).
|
||||
func mutationTypeName(v interface{}) string {
|
||||
if v == nil {
|
||||
return "nil"
|
||||
}
|
||||
t := reflect.TypeOf(v)
|
||||
if t.Kind() == reflect.Ptr {
|
||||
t = t.Elem()
|
||||
}
|
||||
return t.Name()
|
||||
}
|
||||
|
||||
/*
|
||||
Future enhancements:
|
||||
- Compression: gzip large (> N events) logs to reduce disk usage.
|
||||
- Compaction: periodic snapshot + truncate old events to bound replay latency.
|
||||
- Checkpoint events: inject cart state snapshots every M mutations.
|
||||
- Integrity: add checksum per line for corruption detection.
|
||||
- Multi-writer safety across processes (currently only safe within one process).
|
||||
*/
|
||||
Reference in New Issue
Block a user