package actor import ( "errors" "fmt" "log" "os" "path/filepath" "sync" "time" "github.com/gogo/protobuf/proto" "github.com/matst80/slask-finder/pkg/messaging" amqp "github.com/rabbitmq/amqp091-go" ) type QueueEvent struct { TimeStamp time.Time Message proto.Message } type DiskStorage[V any] struct { *StateStorage path string done chan struct{} queue *sync.Map // map[uint64][]QueueEvent } type LogStorage[V any] interface { LoadEvents(id uint64, grain Grain[V]) error AppendMutations(id uint64, msg ...proto.Message) error } type LogListener interface { AppendMutations(id uint64, msg ...ApplyResult) } type AmqpListener struct { conn *amqp.Connection } func NewAmqpListener(conn *amqp.Connection) *AmqpListener { return &AmqpListener{ conn: conn, } } func (l *AmqpListener) DefineTopics() { ch, err := l.conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil { log.Fatalf("Failed to declare topic mutation: %v", err) } } type CartEvent struct { Id uint64 `json:"id"` Mutations []ApplyResult `json:"mutations"` } func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) { err := messaging.SendChange(l.conn, "cart", "mutation", &CartEvent{ Id: id, Mutations: msg, }) if err != nil { log.Printf("Failed to send mutation event: %v", err) } } func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] { return &DiskStorage[V]{ StateStorage: NewState(registry), path: path, done: make(chan struct{}), } } func (s *DiskStorage[V]) SaveLoop(duration time.Duration) { s.queue = &sync.Map{} ticker := time.NewTicker(duration) defer ticker.Stop() for { select { case <-s.done: s.save() return case <-ticker.C: s.save() } } } func (s *DiskStorage[V]) save() { carts := 0 lines := 0 s.queue.Range(func(key, value any) bool { id := key.(uint64) path := s.logPath(id) fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { log.Printf("failed to open event log file: %v", err) return true } defer fh.Close() if qe, ok := value.([]QueueEvent); ok { for _, msg := range qe { if err := s.Append(fh, msg.Message, msg.TimeStamp); err != nil { log.Printf("failed to append event to log file: %v", err) } lines++ } } carts++ s.queue.Delete(id) return true }) if lines > 0 { log.Printf("Appended %d carts and %d lines to disk", carts, lines) } } func (s *DiskStorage[V]) logPath(id uint64) string { return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id)) } func (s *DiskStorage[V]) LoadEvents(id uint64, grain Grain[V]) error { path := s.logPath(id) if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { // No log -> nothing to replay return nil } fh, err := os.Open(path) if err != nil { return fmt.Errorf("open replay file: %w", err) } defer fh.Close() return s.Load(fh, func(msg proto.Message) { s.registry.Apply(grain, msg) }) } func (s *DiskStorage[V]) Close() { s.save() close(s.done) } func (s *DiskStorage[V]) AppendMutations(id uint64, msg ...proto.Message) error { if s.queue != nil { queue := make([]QueueEvent, 0) data, found := s.queue.Load(id) if found { queue = data.([]QueueEvent) } for _, m := range msg { queue = append(queue, QueueEvent{Message: m, TimeStamp: time.Now()}) } s.queue.Store(id, queue) return nil } else { path := s.logPath(id) fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { log.Printf("failed to open event log file: %v", err) return err } defer fh.Close() for _, m := range msg { err = s.Append(fh, m, time.Now()) } return err } }