63 lines
1.3 KiB
Go
63 lines
1.3 KiB
Go
package actor
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
)
|
|
|
|
type DiskStorage[V any] struct {
|
|
*StateStorage
|
|
path string
|
|
}
|
|
|
|
type LogStorage[V any] interface {
|
|
LoadEvents(id uint64, grain Grain[V]) error
|
|
AppendEvent(id uint64, msg proto.Message) error
|
|
}
|
|
|
|
func NewDiskStorage[V any](path string, registry MutationRegistry) LogStorage[V] {
|
|
return &DiskStorage[V]{
|
|
StateStorage: NewState(registry),
|
|
path: path,
|
|
}
|
|
}
|
|
|
|
func (s *DiskStorage[V]) logPath(id uint64) string {
|
|
return filepath.Join(s.path, fmt.Sprintf("%d.events.log", id))
|
|
}
|
|
|
|
func (s *DiskStorage[V]) LoadEvents(id uint64, grain Grain[V]) error {
|
|
path := s.logPath(id)
|
|
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
|
// No log -> nothing to replay
|
|
return nil
|
|
}
|
|
|
|
fh, err := os.Open(path)
|
|
if err != nil {
|
|
return fmt.Errorf("open replay file: %w", err)
|
|
}
|
|
defer fh.Close()
|
|
return s.Load(fh, func(msg proto.Message) {
|
|
s.registry.Apply(grain, msg)
|
|
})
|
|
}
|
|
|
|
func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error {
|
|
path := s.logPath(id)
|
|
fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
log.Printf("failed to open event log file: %v", err)
|
|
return err
|
|
}
|
|
defer fh.Close()
|
|
|
|
return s.Append(fh, msg)
|
|
|
|
}
|