cleanup
All checks were successful
Build and Publish / Metadata (push) Successful in 9s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 1m28s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m1s

This commit is contained in:
matst80
2025-10-13 17:39:07 +02:00
parent 2bf0475335
commit 6fbd62936f
13 changed files with 274 additions and 527 deletions

View File

@@ -1,291 +0,0 @@
package main
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"path/filepath"
"reflect"
"sync"
"time"
"git.tornberg.me/go-cart-actor/pkg/actor"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
/*
event_log.go
Append-only cart event log (per cart id) with replay + metrics.
Rationale:
- Enables recovery of in-memory cart state after process restarts or TTL eviction.
- Provides a chronological mutation trail for auditing / debugging.
- Avoids write amplification of full snapshots on every mutation.
Format:
One JSON object per line:
{
"ts": 1700000000,
"type": "AddRequest",
"payload": { ... mutation fields ... }
}
Concurrency:
- Appends: synchronized per-cart via an in-process mutex map to avoid partial writes.
- Replay: sequential read of entire file; mutations applied in order.
Usage Integration (to be wired by caller):
1. After successful mutation application (non-replay), invoke:
AppendCartEvent(grain.GetId(), mutation)
2. During grain spawn, call:
ReplayCartEvents(grain, grain.GetId())
BEFORE serving requests, so state is reconstructed.
Metrics:
- cart_event_log_appends_total
- cart_event_log_replay_total
- cart_event_log_replay_failures_total
- cart_event_log_bytes_written_total
- cart_event_log_files_existing (gauge)
- cart_event_log_last_append_unix (gauge)
- cart_event_log_replay_duration_seconds (histogram)
Rotation / Compaction:
- Not implemented. If needed, implement size checks and snapshot+truncate later.
Caveats:
- Mutation schema changes may break replay unless backward-compatible.
- Missing / unknown event types are skipped (metric incremented).
- If a mutation fails during replay, replay continues (logged + metric).
*/
var (
eventLogDir = "data"
eventAppendsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_event_log_appends_total",
Help: "Total number of cart mutation events appended to event logs.",
})
eventReplayTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_event_log_replay_total",
Help: "Total number of successful event log replays (per cart).",
})
eventReplayFailuresTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_event_log_replay_failures_total",
Help: "Total number of failed event log replay operations.",
})
eventBytesWrittenTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_event_log_bytes_written_total",
Help: "Cumulative number of bytes written to all cart event logs.",
})
eventFilesExisting = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_event_log_files_existing",
Help: "Number of cart event log files currently existing on disk.",
})
eventLastAppendUnix = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cart_event_log_last_append_unix",
Help: "Unix timestamp of the last append to any cart event log.",
})
eventReplayDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "cart_event_log_replay_duration_seconds",
Help: "Duration of replay operations per cart in seconds.",
Buckets: prometheus.DefBuckets,
})
eventUnknownTypesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_event_log_unknown_types_total",
Help: "Total number of unknown event types encountered during replay (skipped).",
})
eventMutationErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_event_log_mutation_errors_total",
Help: "Total number of errors applying mutation events during replay.",
})
)
type cartEventRecord struct {
Timestamp int64 `json:"ts"`
Type string `json:"type"`
Payload interface{} `json:"payload"`
}
// registry of supported mutation payload type constructors
var eventTypeFactories = map[string]func() interface{}{
"AddRequest": func() interface{} { return &messages.AddRequest{} },
"AddItem": func() interface{} { return &messages.AddItem{} },
"RemoveItem": func() interface{} { return &messages.RemoveItem{} },
"RemoveDelivery": func() interface{} { return &messages.RemoveDelivery{} },
"ChangeQuantity": func() interface{} { return &messages.ChangeQuantity{} },
"SetDelivery": func() interface{} { return &messages.SetDelivery{} },
"SetPickupPoint": func() interface{} { return &messages.SetPickupPoint{} },
"SetCartRequest": func() interface{} { return &messages.SetCartRequest{} },
"OrderCreated": func() interface{} { return &messages.OrderCreated{} },
"InitializeCheckout": func() interface{} { return &messages.InitializeCheckout{} },
}
// Per-cart mutexes to serialize append operations (avoid partial overlapping writes)
var (
eventLogMu sync.Map // map[string]*sync.Mutex
)
// getCartEventMutex returns a mutex for a specific cart id string.
func getCartEventMutex(id string) *sync.Mutex {
if v, ok := eventLogMu.Load(id); ok {
return v.(*sync.Mutex)
}
m := &sync.Mutex{}
actual, _ := eventLogMu.LoadOrStore(id, m)
return actual.(*sync.Mutex)
}
// EventLogPath returns the path to the cart's event log file.
func EventLogPath(id CartId) string {
return filepath.Join(eventLogDir, fmt.Sprintf("%s.events.log", id.String()))
}
// EnsureEventLogDirectory ensures base directory exists and updates gauge.
func EnsureEventLogDirectory() error {
if _, err := os.Stat(eventLogDir); errors.Is(err, os.ErrNotExist) {
if err2 := os.MkdirAll(eventLogDir, 0755); err2 != nil {
return err2
}
}
// Update files existing gauge (approximate; counts matching *.events.log)
pattern := filepath.Join(eventLogDir, "*.events.log")
matches, _ := filepath.Glob(pattern)
eventFilesExisting.Set(float64(len(matches)))
return nil
}
// AppendCartEvent appends a mutation event to the cart's log (JSON line).
func AppendCartEvent(id CartId, mutation interface{}) error {
if mutation == nil {
return errors.New("nil mutation cannot be logged")
}
if err := EnsureEventLogDirectory(); err != nil {
return err
}
typ := mutationTypeName(mutation)
rec := cartEventRecord{
Timestamp: time.Now().Unix(),
Type: typ,
Payload: mutation,
}
lineBytes, err := json.Marshal(rec)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
lineBytes = append(lineBytes, '\n')
path := EventLogPath(id)
mtx := getCartEventMutex(id.String())
mtx.Lock()
defer mtx.Unlock()
fh, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("open event log: %w", err)
}
defer fh.Close()
n, werr := fh.Write(lineBytes)
if werr != nil {
return fmt.Errorf("write event log: %w", werr)
}
eventAppendsTotal.Inc()
eventBytesWrittenTotal.Add(float64(n))
eventLastAppendUnix.Set(float64(rec.Timestamp))
return nil
}
// ReplayCartEvents replays an existing cart's event log into the provided grain.
// It applies mutation payloads in order, skipping unknown types.
func ReplayCartEvents(grain *CartGrain, id CartId, registry actor.MutationRegistry) error {
start := time.Now()
path := EventLogPath(id)
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
// No log -> nothing to replay
return nil
}
fh, err := os.Open(path)
if err != nil {
eventReplayFailuresTotal.Inc()
return fmt.Errorf("open replay file: %w", err)
}
defer fh.Close()
scanner := bufio.NewScanner(fh)
// Increase buffer in case of large payloads
const maxLine = 256 * 1024
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, maxLine)
for scanner.Scan() {
line := scanner.Bytes()
var raw struct {
Timestamp time.Time `json:"ts"`
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
if err := json.Unmarshal(line, &raw); err != nil {
eventReplayFailuresTotal.Inc()
continue // skip malformed line
}
instance, ok := registry.Create(raw.Type)
if !ok {
log.Printf("loading failed for unknown mutation type: %s", raw.Type)
eventReplayFailuresTotal.Inc()
continue // skip unknown type
}
if err := json.Unmarshal(raw.Payload, instance); err != nil {
eventMutationErrorsTotal.Inc()
continue
}
// Apply mutation directly using internal registration (bypass AppendCartEvent recursion).
if applyErr := registry.Apply(grain, instance); applyErr != nil {
eventMutationErrorsTotal.Inc()
continue
} else {
// Update lastChange to the timestamp of this event (sliding inactivity window support).
grain.lastChange = raw.Timestamp
}
}
if serr := scanner.Err(); serr != nil {
eventReplayFailuresTotal.Inc()
return fmt.Errorf("scanner error: %w", serr)
}
eventReplayTotal.Inc()
eventReplayDuration.Observe(time.Since(start).Seconds())
return nil
}
// mutationTypeName returns the short struct name for a mutation (pointer aware).
func mutationTypeName(v interface{}) string {
if v == nil {
return "nil"
}
t := reflect.TypeOf(v)
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t.Name()
}
/*
Future enhancements:
- Compression: gzip large (> N events) logs to reduce disk usage.
- Compaction: periodic snapshot + truncate old events to bound replay latency.
- Checkpoint events: inject cart state snapshots every M mutations.
- Integrity: add checksum per line for corruption detection.
- Multi-writer safety across processes (currently only safe within one process).
*/

View File

@@ -110,6 +110,9 @@ func main() {
actor.NewMutation(InitializeCheckout, func() *messages.InitializeCheckout {
return &messages.InitializeCheckout{}
}),
actor.NewMutation(OrderCreated, func() *messages.OrderCreated {
return &messages.OrderCreated{}
}),
actor.NewMutation(RemoveDelivery, func() *messages.RemoveDelivery {
return &messages.RemoveDelivery{}
}),
@@ -119,6 +122,9 @@ func main() {
actor.NewMutation(SetPickupPoint, func() *messages.SetPickupPoint {
return &messages.SetPickupPoint{}
}),
actor.NewMutation(ClearCart, func() *messages.ClearCartRequest {
return &messages.ClearCartRequest{}
}),
)
diskStorage := actor.NewDiskStorage[CartGrain]("data", reg)
poolConfig := actor.GrainPoolConfig[CartGrain]{
@@ -372,7 +378,7 @@ func main() {
return
}
err = triggerOrderCompleted(err, syncedServer, order)
err = triggerOrderCompleted(syncedServer, order)
if err != nil {
log.Printf("Error processing cart message: %v\n", err)
w.WriteHeader(http.StatusInternalServerError)
@@ -409,7 +415,7 @@ func main() {
}
func triggerOrderCompleted(err error, syncedServer *PoolServer, order *CheckoutOrder) error {
func triggerOrderCompleted(syncedServer *PoolServer, order *CheckoutOrder) error {
mutation := &messages.OrderCreated{
OrderId: order.ID,
Status: order.Status,
@@ -418,10 +424,8 @@ func triggerOrderCompleted(err error, syncedServer *PoolServer, order *CheckoutO
if !ok {
return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1)
}
_, applyErr := syncedServer.pool.Apply(uint64(cid), mutation)
if applyErr == nil {
_ = AppendCartEvent(cid, mutation)
}
_, applyErr := syncedServer.Apply(uint64(cid), mutation)
return applyErr
}

View File

@@ -71,6 +71,6 @@ func AddItem(g *CartGrain, m *messages.AddItem) error {
TaxRate: taxRate,
StoreId: m.StoreId,
})
g.UpdateTotals()
return nil
}

View File

@@ -49,5 +49,6 @@ func ChangeQuantity(g *CartGrain, m *messages.ChangeQuantity) error {
}
g.Items[foundIndex].Quantity = int(m.Quantity)
g.UpdateTotals()
return nil
}

View File

@@ -44,5 +44,6 @@ func RemoveDelivery(g *CartGrain, m *messages.RemoveDelivery) error {
// Remove delivery (order not preserved beyond necessity)
g.Deliveries = append(g.Deliveries[:index], g.Deliveries[index+1:]...)
g.UpdateTotals()
return nil
}

View File

@@ -40,5 +40,6 @@ func RemoveItem(g *CartGrain, m *messages.RemoveItem) error {
}
g.Items = append(g.Items[:index], g.Items[index+1:]...)
g.UpdateTotals()
return nil
}

View File

@@ -1,46 +0,0 @@
package main
// mutation_set_cart_items.go
//
// Registers the SetCartRequest mutation. This mutation replaces the entire list
// of cart items with the provided list (each entry is an AddRequest).
//
// Behavior:
// - Clears existing items (but leaves deliveries intact).
// - Iterates over each AddRequest and delegates to CartGrain.AddItem
// (which performs product lookup, creates AddItem mutation).
// - If any single addition fails, the mutation aborts with an error;
// items added prior to the failure remain (consistent with previous behavior).
// - Totals recalculated after completion via WithTotals().
//
// Notes:
// - Potential optimization: batch product lookups; currently sequential.
// - Consider adding rollback semantics if atomic replacement is desired.
// - Deliveries might reference item IDs that are now invalid—original logic
// also left deliveries untouched. If that becomes an issue, add a cleanup
// pass to remove deliveries whose item IDs no longer exist.
// func HandleSetCartRequest(g *CartGrain, m *messages.SetCartRequest) error {
// if m == nil {
// return fmt.Errorf("SetCartRequest: nil payload")
// }
// // Clear current items (keep deliveries)
// g.mu.Lock()
// g.Items = make([]*CartItem, 0, len(m.Items))
// g.mu.Unlock()
// for _, it := range m.Items {
// if it == nil {
// continue
// }
// if it.Sku == "" || it.Quantity < 1 {
// return fmt.Errorf("SetCartRequest: invalid item (sku='%s' qty=%d)", it.Sku, it.Quantity)
// }
// _, err := g.AddItem(it.Sku, int(it.Quantity), it.Country, it.StoreId)
// if err != nil {
// return fmt.Errorf("SetCartRequest: add sku '%s' failed: %w", it.Sku, err)
// }
// }
// return nil
// }

View File

@@ -49,3 +49,14 @@ func SetPickupPoint(g *CartGrain, m *messages.SetPickupPoint) error {
}
return fmt.Errorf("SetPickupPoint: delivery id %d not found", m.DeliveryId)
}
func ClearCart(g *CartGrain, m *messages.ClearCartRequest) error {
if m == nil {
return fmt.Errorf("ClearCart: nil payload")
}
// maybe check if payment is done?
g.Deliveries = g.Deliveries[:0]
g.Items = g.Items[:0]
g.UpdateTotals()
return nil
}

View File

@@ -7,6 +7,7 @@ import (
"log"
"net/http"
"strconv"
"sync"
"time"
"git.tornberg.me/go-cart-actor/pkg/actor"
@@ -15,25 +16,25 @@ import (
)
type PoolServer struct {
actor.GrainPool[*CartGrain]
pod_name string
pool actor.GrainPool[*CartGrain]
klarnaClient *KlarnaClient
}
func NewPoolServer(pool actor.GrainPool[*CartGrain], pod_name string, klarnaClient *KlarnaClient) *PoolServer {
return &PoolServer{
GrainPool: pool,
pod_name: pod_name,
pool: pool,
klarnaClient: klarnaClient,
}
}
func (s *PoolServer) ApplyLocal(id CartId, mutation proto.Message) (*CartGrain, error) {
return s.pool.Apply(uint64(id), mutation)
return s.Apply(uint64(id), mutation)
}
func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request, id CartId) error {
grain, err := s.pool.Get(uint64(id))
grain, err := s.Get(uint64(id))
if err != nil {
return err
}
@@ -163,16 +164,42 @@ func (s *PoolServer) HandleQuantityChange(w http.ResponseWriter, r *http.Request
return s.WriteResult(w, reply)
}
type SetCartItems struct {
Country string `json:"country"`
Items []struct {
Sku string `json:"sku"`
Quantity int `json:"quantity"`
} `json:"items"`
}
func (s *PoolServer) HandleSetCartItems(w http.ResponseWriter, r *http.Request, id CartId) error {
setCartItems := messages.SetCartRequest{}
setCartItems := SetCartItems{}
err := json.NewDecoder(r.Body).Decode(&setCartItems)
if err != nil {
return err
}
reply, err := s.ApplyLocal(id, &setCartItems)
reply, err := s.ApplyLocal(id, &messages.ClearCartRequest{})
if err != nil {
return err
}
wg := sync.WaitGroup{}
for _, item := range setCartItems.Items {
wg.Add(1)
go func(sku string, quantity int) {
msg, err := GetItemAddMessage(sku, quantity, setCartItems.Country, nil)
if err != nil {
log.Printf("error adding item %s: %v", sku, err)
return
}
reply, err = s.ApplyLocal(id, msg)
if err != nil {
log.Printf("error applying message %v: %v", msg, err)
return
}
wg.Done()
}(item.Sku, item.Quantity)
}
wg.Wait()
return s.WriteResult(w, reply)
}
@@ -241,7 +268,7 @@ func (s *PoolServer) CreateOrUpdateCheckout(host string, id CartId) (*CheckoutOr
}
// Get current grain state (may be local or remote)
grain, err := s.pool.Get(uint64(id))
grain, err := s.Get(uint64(id))
if err != nil {
return nil, err
}
@@ -261,7 +288,7 @@ func (s *PoolServer) CreateOrUpdateCheckout(host string, id CartId) (*CheckoutOr
func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id CartId) (*CartGrain, error) {
// Persist initialization state via mutation (best-effort)
return s.pool.Apply(uint64(id), &messages.InitializeCheckout{
return s.Apply(uint64(id), &messages.InitializeCheckout{
OrderId: klarnaOrder.ID,
Status: klarnaOrder.Status,
PaymentInProgress: true,
@@ -373,7 +400,7 @@ func CartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request
func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId) error) func(cartId CartId, w http.ResponseWriter, r *http.Request) error {
return func(cartId CartId, w http.ResponseWriter, r *http.Request) error {
if ownerHost, ok := s.pool.OwnerHost(uint64(cartId)); ok {
if ownerHost, ok := s.OwnerHost(uint64(cartId)); ok {
handled, err := ownerHost.Proxy(uint64(cartId), w, r)
if err == nil && handled {
return nil