feature/backoffice #6
@@ -8,17 +8,20 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.tornberg.me/go-cart-actor/pkg/actor"
|
||||||
"git.tornberg.me/go-cart-actor/pkg/cart"
|
"git.tornberg.me/go-cart-actor/pkg/cart"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileServer struct {
|
type FileServer struct {
|
||||||
// Define fields here
|
// Define fields here
|
||||||
dataDir string
|
dataDir string
|
||||||
|
storage actor.LogStorage[cart.CartGrain]
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFileServer(dataDir string) *FileServer {
|
func NewFileServer(dataDir string) *FileServer {
|
||||||
@@ -27,21 +30,29 @@ func NewFileServer(dataDir string) *FileServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isValidId(id string) (uint64, bool) {
|
||||||
|
if nr, err := strconv.ParseUint(id, 10, 64); err == nil {
|
||||||
|
return nr, true
|
||||||
|
}
|
||||||
|
if nr, ok := cart.ParseCartId(id); ok {
|
||||||
|
return uint64(nr), true
|
||||||
|
}
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
func isValidFileId(name string) (uint64, bool) {
|
func isValidFileId(name string) (uint64, bool) {
|
||||||
|
|
||||||
parts := strings.Split(name, ".")
|
parts := strings.Split(name, ".")
|
||||||
if len(parts) > 1 && parts[1] == "events" {
|
if len(parts) > 1 && parts[1] == "events" {
|
||||||
idStr := parts[0]
|
idStr := parts[0]
|
||||||
if _, err := strconv.ParseUint(idStr, 10, 64); err != nil {
|
|
||||||
return 0, false
|
return isValidId(idStr)
|
||||||
}
|
|
||||||
if id, err := strconv.ParseUint(idStr, 10, 64); err == nil {
|
|
||||||
return id, true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`)
|
||||||
|
|
||||||
func listCartFiles(dir string) ([]CartFileInfo, error) {
|
func listCartFiles(dir string) ([]CartFileInfo, error) {
|
||||||
entries, err := os.ReadDir(dir)
|
entries, err := os.ReadDir(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -76,20 +87,20 @@ func listCartFiles(dir string) ([]CartFileInfo, error) {
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func readRawLogLines(path string) ([]string, error) {
|
func readRawLogLines(path string) ([]json.RawMessage, error) {
|
||||||
fh, err := os.Open(path)
|
fh, err := os.Open(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer fh.Close()
|
defer fh.Close()
|
||||||
lines := make([]string, 0, 64)
|
lines := make([]json.RawMessage, 0, 64)
|
||||||
s := bufio.NewScanner(fh)
|
s := bufio.NewScanner(fh)
|
||||||
// increase buffer to handle larger JSON lines
|
// increase buffer to handle larger JSON lines
|
||||||
buf := make([]byte, 0, 1024*1024)
|
buf := make([]byte, 0, 1024*1024)
|
||||||
s.Buffer(buf, 1024*1024)
|
s.Buffer(buf, 1024*1024)
|
||||||
for s.Scan() {
|
for s.Scan() {
|
||||||
line := strings.TrimSpace(s.Text())
|
line := s.Bytes()
|
||||||
if line == "" {
|
if line == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
lines = append(lines, line)
|
lines = append(lines, line)
|
||||||
@@ -122,40 +133,41 @@ func (fs *FileServer) CartsHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type JsonError struct {
|
||||||
|
Error string `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
idStr := r.PathValue("id")
|
idStr := r.PathValue("id")
|
||||||
if idStr == "" {
|
if idStr == "" {
|
||||||
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing id"})
|
writeJSON(w, http.StatusBadRequest, JsonError{Error: "missing id"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
id, ok := isValidId(idStr)
|
||||||
id, err := strconv.ParseUint(idStr, 10, 64)
|
if !ok {
|
||||||
if err != nil {
|
writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid id"})
|
||||||
if cartId, ok := cart.ParseCartId(idStr); !ok {
|
|
||||||
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid id"})
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
id = uint64(cartId)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// reconstruct state from event log if present
|
// reconstruct state from event log if present
|
||||||
grain := cart.NewCartGrain(id, time.Now())
|
grain := cart.NewCartGrain(id, time.Now())
|
||||||
if globalDisk != nil {
|
|
||||||
_ = globalDisk.LoadEvents(id, grain)
|
err := fs.storage.LoadEvents(id, grain)
|
||||||
|
if err != nil {
|
||||||
|
writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()})
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id))
|
path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id))
|
||||||
info, err := os.Stat(path)
|
info, err := os.Stat(path)
|
||||||
if err != nil && errors.Is(err, os.ErrNotExist) {
|
if err != nil && errors.Is(err, os.ErrNotExist) {
|
||||||
writeJSON(w, http.StatusNotFound, map[string]string{"error": "cart not found"})
|
writeJSON(w, http.StatusNotFound, JsonError{Error: "cart not found"})
|
||||||
return
|
return
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lines, err := readRawLogLines(path)
|
lines, err := readRawLogLines(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, map[string]any{
|
writeJSON(w, http.StatusOK, map[string]any{
|
||||||
@@ -167,6 +179,7 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
"size": info.Size(),
|
"size": info.Size(),
|
||||||
"modified": info.ModTime(),
|
"modified": info.ModTime(),
|
||||||
"path": path,
|
"path": path,
|
||||||
|
"system": info.Sys(),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
actor "git.tornberg.me/go-cart-actor/pkg/actor"
|
actor "git.tornberg.me/go-cart-actor/pkg/actor"
|
||||||
@@ -30,8 +29,6 @@ func envOrDefault(key, def string) string {
|
|||||||
return def
|
return def
|
||||||
}
|
}
|
||||||
|
|
||||||
var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`)
|
|
||||||
|
|
||||||
var globalDisk *actor.DiskStorage[cart.CartGrain]
|
var globalDisk *actor.DiskStorage[cart.CartGrain]
|
||||||
|
|
||||||
func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error {
|
func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error {
|
||||||
|
|||||||
@@ -99,6 +99,11 @@ type MutationContext struct {
|
|||||||
VoucherService voucher.Service
|
VoucherService voucher.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CartChangeEvent struct {
|
||||||
|
CartId cart.CartId `json:"cartId"`
|
||||||
|
Mutations []actor.ApplyResult `json:"mutations"`
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
controlPlaneConfig := actor.DefaultServerConfig()
|
controlPlaneConfig := actor.DefaultServerConfig()
|
||||||
@@ -138,7 +143,12 @@ func main() {
|
|||||||
fmt.Errorf("failed to connect to RabbitMQ: %w", err)
|
fmt.Errorf("failed to connect to RabbitMQ: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
amqpListener := actor.NewAmqpListener(conn)
|
amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) {
|
||||||
|
return &CartChangeEvent{
|
||||||
|
CartId: cart.CartId(id),
|
||||||
|
Mutations: msg,
|
||||||
|
}, nil
|
||||||
|
})
|
||||||
amqpListener.DefineTopics()
|
amqpListener.DefineTopics()
|
||||||
pool.AddListener(amqpListener)
|
pool.AddListener(amqpListener)
|
||||||
|
|
||||||
|
|||||||
@@ -10,8 +10,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
"github.com/matst80/slask-finder/pkg/messaging"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type QueueEvent struct {
|
type QueueEvent struct {
|
||||||
@@ -31,46 +29,6 @@ type LogStorage[V any] interface {
|
|||||||
AppendMutations(id uint64, msg ...proto.Message) error
|
AppendMutations(id uint64, msg ...proto.Message) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type LogListener interface {
|
|
||||||
AppendMutations(id uint64, msg ...ApplyResult)
|
|
||||||
}
|
|
||||||
|
|
||||||
type AmqpListener struct {
|
|
||||||
conn *amqp.Connection
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewAmqpListener(conn *amqp.Connection) *AmqpListener {
|
|
||||||
return &AmqpListener{
|
|
||||||
conn: conn,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *AmqpListener) DefineTopics() {
|
|
||||||
ch, err := l.conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to open a channel: %v", err)
|
|
||||||
}
|
|
||||||
defer ch.Close()
|
|
||||||
if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil {
|
|
||||||
log.Fatalf("Failed to declare topic mutation: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type CartEvent struct {
|
|
||||||
Id uint64 `json:"id"`
|
|
||||||
Mutations []ApplyResult `json:"mutations"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) {
|
|
||||||
err := messaging.SendChange(l.conn, "cart", "mutation", &CartEvent{
|
|
||||||
Id: id,
|
|
||||||
Mutations: msg,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to send mutation event: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] {
|
func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] {
|
||||||
return &DiskStorage[V]{
|
return &DiskStorage[V]{
|
||||||
StateStorage: NewState(registry),
|
StateStorage: NewState(registry),
|
||||||
|
|||||||
47
pkg/actor/log_listerner.go
Normal file
47
pkg/actor/log_listerner.go
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
package actor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/matst80/slask-finder/pkg/messaging"
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type LogListener interface {
|
||||||
|
AppendMutations(id uint64, msg ...ApplyResult)
|
||||||
|
}
|
||||||
|
|
||||||
|
type AmqpListener struct {
|
||||||
|
conn *amqp.Connection
|
||||||
|
transformer func(id uint64, msg []ApplyResult) (any, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAmqpListener(conn *amqp.Connection, transformer func(id uint64, msg []ApplyResult) (any, error)) *AmqpListener {
|
||||||
|
return &AmqpListener{
|
||||||
|
conn: conn,
|
||||||
|
transformer: transformer,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *AmqpListener) DefineTopics() {
|
||||||
|
ch, err := l.conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to open a channel: %v", err)
|
||||||
|
}
|
||||||
|
defer ch.Close()
|
||||||
|
if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil {
|
||||||
|
log.Fatalf("Failed to declare topic mutation: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) {
|
||||||
|
data, err := l.transformer(id, msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to transform mutation event: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = messaging.SendChange(l.conn, "cart", "mutation", data)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to send mutation event: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user