Complete refactor to new grpc control plane and only http proxy for carts #4

Merged
mats merged 75 commits from refactor/http-proxy into main 2025-10-14 22:31:28 +02:00
3 changed files with 29 additions and 20 deletions
Showing only changes of commit 91e398dcc3 - Show all commits

View File

@@ -403,7 +403,7 @@ func main() {
go func() { go func() {
sig := <-sigs sig := <-sigs
fmt.Println("Shutting down due to signal:", sig) fmt.Println("Shutting down due to signal:", sig)
//app.Save() diskStorage.Close()
pool.Close() pool.Close()
done <- true done <- true

View File

@@ -70,8 +70,10 @@ func (s *DiskStorage[V]) save() {
} }
} }
} }
s.queue.Delete(id)
return true return true
}) })
log.Print("Appended carts to disk")
} }
func (s *DiskStorage[V]) logPath(id uint64) string { func (s *DiskStorage[V]) logPath(id uint64) string {
@@ -102,7 +104,6 @@ func (s *DiskStorage[V]) Close() {
func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error { func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error {
if s.queue != nil { if s.queue != nil {
queue := make([]QueueEvent, 0) queue := make([]QueueEvent, 0)
data, found := s.queue.Load(id) data, found := s.queue.Load(id)
if found { if found {
@@ -112,7 +113,6 @@ func (s *DiskStorage[V]) AppendEvent(id uint64, msg proto.Message) error {
s.queue.Store(id, queue) s.queue.Store(id, queue)
return nil return nil
} else { } else {
path := s.logPath(id) path := s.logPath(id)
fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) fh, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {

View File

@@ -1,6 +1,7 @@
package actor package actor
import ( import (
"bufio"
"encoding/json" "encoding/json"
"errors" "errors"
"io" "io"
@@ -36,8 +37,9 @@ var ErrUnknownType = errors.New("unknown type")
func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message)) error { func (s *StateStorage) Load(r io.Reader, onMessage func(msg proto.Message)) error {
var err error var err error
var evt *StorageEvent var evt *StorageEvent
scanner := bufio.NewScanner(r)
for err == nil { for err == nil {
evt, err = s.Read(r) evt, err = s.Read(scanner)
if err == nil { if err == nil {
onMessage(evt.Mutation) onMessage(evt.Mutation)
} }
@@ -65,12 +67,17 @@ func (s *StateStorage) Append(io io.Writer, mutation proto.Message, timeStamp ti
if _, err := io.Write(jsonBytes); err != nil { if _, err := io.Write(jsonBytes); err != nil {
return err return err
} }
io.Write([]byte("\n"))
return nil return nil
} }
func (s *StateStorage) Read(r io.Reader) (*StorageEvent, error) { func (s *StateStorage) Read(r *bufio.Scanner) (*StorageEvent, error) {
var event rawEvent var event rawEvent
if err := json.NewDecoder(r).Decode(&event); err != nil {
if r.Scan() {
b := r.Bytes()
err := json.Unmarshal(b, &event)
if err != nil {
return nil, err return nil, err
} }
typeName := event.Type typeName := event.Type
@@ -85,5 +92,7 @@ func (s *StateStorage) Read(r io.Reader) (*StorageEvent, error) {
Type: typeName, Type: typeName,
TimeStamp: event.TimeStamp, TimeStamp: event.TimeStamp,
Mutation: mutation, Mutation: mutation,
}, nil }, r.Err()
}
return nil, io.EOF
} }