Complete refactor to new grpc control plane and only http proxy for carts (#4)
Co-authored-by: matst80 <mats.tornberg@gmail.com> Reviewed-on: https://git.tornberg.me/mats/go-cart-actor/pulls/4 Co-authored-by: Mats Törnberg <mats@tornberg.me> Co-committed-by: Mats Törnberg <mats@tornberg.me>
This commit was merged in pull request #4.
This commit is contained in:
137
pkg/actor/disk_storage.go
Normal file
137
pkg/actor/disk_storage.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package actor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
)
|
||||
|
||||
type QueueEvent struct {
|
||||
TimeStamp time.Time
|
||||
Message proto.Message
|
||||
}
|
||||
|
||||
type DiskStorage[V any] struct {
|
||||
*StateStorage
|
||||
path string
|
||||
done chan struct{}
|
||||
queue *sync.Map // map[uint64][]QueueEvent
|
||||
}
|
||||
|
||||
type LogStorage[V any] interface {
|
||||
LoadEvents(id uint64, grain Grain[V]) error
|
||||
AppendEvent(id uint64, msg ...proto.Message) error
|
||||
}
|
||||
|
||||
func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] {
|
||||
return &DiskStorage[V]{
|
||||
StateStorage: NewState(registry),
|
||||
path: path,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DiskStorage[V]) SaveLoop(duration time.Duration) {
|
||||
s.queue = &sync.Map{}
|
||||
ticker := time.NewTicker(duration)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-s.done:
|
||||
s.save()
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.save()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DiskStorage[V]) save() {
|
||||
carts := 0
|
||||
lines := 0
|
||||
s.queue.Range(func(key, value any) bool {
|
||||
id := key.(uint64)
|
||||
path := s.logPath(id)
|
||||
fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
log.Printf("failed to open event log file: %v", err)
|
||||
return true
|
||||
}
|
||||
defer fh.Close()
|
||||
|
||||
if qe, ok := value.([]QueueEvent); ok {
|
||||
for _, msg := range qe {
|
||||
if err := s.Append(fh, msg.Message, msg.TimeStamp); err != nil {
|
||||
log.Printf("failed to append event to log file: %v", err)
|
||||
}
|
||||
lines++
|
||||
}
|
||||
}
|
||||
carts++
|
||||
s.queue.Delete(id)
|
||||
return true
|
||||
})
|
||||
if lines > 0 {
|
||||
log.Printf("Appended %d carts and %d lines to disk", carts, lines)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DiskStorage[V]) logPath(id uint64) string {
|
||||
return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id))
|
||||
}
|
||||
|
||||
func (s *DiskStorage[V]) LoadEvents(id uint64, grain Grain[V]) error {
|
||||
path := s.logPath(id)
|
||||
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
||||
// No log -> nothing to replay
|
||||
return nil
|
||||
}
|
||||
|
||||
fh, err := os.Open(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open replay file: %w", err)
|
||||
}
|
||||
defer fh.Close()
|
||||
return s.Load(fh, func(msg proto.Message) {
|
||||
s.registry.Apply(grain, msg)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *DiskStorage[V]) Close() {
|
||||
s.save()
|
||||
close(s.done)
|
||||
}
|
||||
|
||||
func (s *DiskStorage[V]) AppendEvent(id uint64, msg ...proto.Message) error {
|
||||
if s.queue != nil {
|
||||
queue := make([]QueueEvent, 0)
|
||||
data, found := s.queue.Load(id)
|
||||
if found {
|
||||
queue = data.([]QueueEvent)
|
||||
}
|
||||
for _, m := range msg {
|
||||
queue = append(queue, QueueEvent{Message: m, TimeStamp: time.Now()})
|
||||
}
|
||||
s.queue.Store(id, queue)
|
||||
return nil
|
||||
} else {
|
||||
path := s.logPath(id)
|
||||
fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
log.Printf("failed to open event log file: %v", err)
|
||||
return err
|
||||
}
|
||||
defer fh.Close()
|
||||
for _, m := range msg {
|
||||
err = s.Append(fh, m, time.Now())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user