Compare commits
77 Commits
16948fcbdb
...
refactor/h
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e8fd0a21b1 | ||
|
|
137958b83b | ||
|
|
ee5d714220 | ||
|
|
5ca21e214f | ||
|
|
2df64e715e | ||
|
|
e0ea03f6cf | ||
|
|
a7cbdcd0da | ||
|
|
f0b6a733f1 | ||
|
|
c86a189795 | ||
|
|
1565cfdaba | ||
|
|
4bec3f8cc0 | ||
|
|
5e2025e30f | ||
| e908a4130b | |||
| 606df6218a | |||
|
|
9f83717ea9 | ||
|
|
b029a9d05a | ||
|
|
cbbae3dfd3 | ||
|
|
b842efae6a | ||
|
|
bb9a595d25 | ||
|
|
de62b0664d | ||
|
|
0b9c14c231 | ||
|
|
7a79efbb9f | ||
|
|
6c44a03dd1 | ||
|
|
11e82de114 | ||
|
|
67b9a739fe | ||
|
|
5c67579464 | ||
|
|
22ac64c14d | ||
| 27c866ce58 | |||
| c9701803d1 | |||
| dc12e8c3d5 | |||
| 30b98f93ce | |||
| 8e60cc2239 | |||
|
|
91e398dcc3 | ||
|
|
f3e92c7d65 | ||
|
|
6fbd62936f | ||
|
|
2bf0475335 | ||
|
|
10e1affad0 | ||
|
|
e26ad676b3 | ||
|
|
9fc3871e84 | ||
|
|
6094da99f3 | ||
|
|
1575b3a829 | ||
|
|
c6671ceef0 | ||
|
|
6cb46b4e16 | ||
|
|
4e4d5371ec | ||
|
|
c4f0c67580 | ||
|
|
6a9ebbf453 | ||
|
|
ea35871676 | ||
|
|
7ad28966fb | ||
|
|
a7a778caaf | ||
|
|
873fb6c97b | ||
|
|
33ef868295 | ||
|
|
8d73f856bf | ||
|
|
b591e3d3f5 | ||
|
|
b8266d80f9 | ||
| 0ba7410162 | |||
| 9df2f3362a | |||
| 6345d91ef7 | |||
| 4cacc0ee2d | |||
| 24cd0b6ad7 | |||
|
|
e48a2590bd | ||
|
|
b0e6c8eca8 | ||
|
|
7814f33a06 | ||
|
|
fb111ebf97 | ||
|
|
5525e91ecc | ||
|
|
f8c8ad56c7 | ||
|
|
09a68db8d5 | ||
|
|
30c89a0394 | ||
|
|
d6563d0b3a | ||
|
|
2a2ce247d5 | ||
|
|
159253b8b0 | ||
|
|
c30be581cd | ||
|
|
716f1121aa | ||
|
|
12d87036f6 | ||
|
|
e7c67fbb9b | ||
|
|
b97eb8f285 | ||
|
|
2697832d98 | ||
|
|
4c973b239f |
@@ -47,9 +47,6 @@ jobs:
|
|||||||
docker push registry.knatofs.se/go-cart-actor-amd64:${{ needs.Metadata.outputs.version }}
|
docker push registry.knatofs.se/go-cart-actor-amd64:${{ needs.Metadata.outputs.version }}
|
||||||
- name: Apply deployment manifests
|
- name: Apply deployment manifests
|
||||||
run: kubectl apply -f deployment/deployment.yaml -n cart
|
run: kubectl apply -f deployment/deployment.yaml -n cart
|
||||||
- name: Rollout amd64 backoffice deployment
|
|
||||||
run: |
|
|
||||||
kubectl rollout restart deployment/cart-backoffice-x86 -n cart
|
|
||||||
- name: Rollout amd64 deployment (pin to version)
|
- name: Rollout amd64 deployment (pin to version)
|
||||||
run: |
|
run: |
|
||||||
kubectl set image deployment/cart-actor-x86 -n cart cart-actor-amd64=registry.knatofs.se/go-cart-actor-amd64:${{ needs.Metadata.outputs.version }}
|
kubectl set image deployment/cart-actor-x86 -n cart cart-actor-amd64=registry.knatofs.se/go-cart-actor-amd64:${{ needs.Metadata.outputs.version }}
|
||||||
|
|||||||
@@ -59,13 +59,6 @@ RUN --mount=type=cache,target=/go/build-cache \
|
|||||||
-X main.BuildDate=${BUILD_DATE}" \
|
-X main.BuildDate=${BUILD_DATE}" \
|
||||||
-o /out/go-cart-actor ./cmd/cart
|
-o /out/go-cart-actor ./cmd/cart
|
||||||
|
|
||||||
RUN --mount=type=cache,target=/go/build-cache \
|
|
||||||
go build -trimpath -ldflags="-s -w \
|
|
||||||
-X main.Version=${VERSION} \
|
|
||||||
-X main.GitCommit=${GIT_COMMIT} \
|
|
||||||
-X main.BuildDate=${BUILD_DATE}" \
|
|
||||||
-o /out/go-cart-backoffice ./cmd/backoffice
|
|
||||||
|
|
||||||
############################
|
############################
|
||||||
# Runtime Stage
|
# Runtime Stage
|
||||||
############################
|
############################
|
||||||
@@ -74,7 +67,6 @@ FROM gcr.io/distroless/static-debian12:nonroot AS runtime
|
|||||||
WORKDIR /
|
WORKDIR /
|
||||||
|
|
||||||
COPY --from=build /out/go-cart-actor /go-cart-actor
|
COPY --from=build /out/go-cart-actor /go-cart-actor
|
||||||
COPY --from=build /out/go-cart-backoffice /go-cart-backoffice
|
|
||||||
|
|
||||||
# Document (not expose forcibly) typical ports: 8080 (HTTP), 1337 (gRPC)
|
# Document (not expose forcibly) typical ports: 8080 (HTTP), 1337 (gRPC)
|
||||||
EXPOSE 8080 1337
|
EXPOSE 8080 1337
|
||||||
|
|||||||
@@ -1,185 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"regexp"
|
|
||||||
"sort"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.tornberg.me/go-cart-actor/pkg/actor"
|
|
||||||
"git.tornberg.me/go-cart-actor/pkg/cart"
|
|
||||||
)
|
|
||||||
|
|
||||||
type FileServer struct {
|
|
||||||
// Define fields here
|
|
||||||
dataDir string
|
|
||||||
storage actor.LogStorage[cart.CartGrain]
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewFileServer(dataDir string) *FileServer {
|
|
||||||
return &FileServer{
|
|
||||||
dataDir: dataDir,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func isValidId(id string) (uint64, bool) {
|
|
||||||
if nr, err := strconv.ParseUint(id, 10, 64); err == nil {
|
|
||||||
return nr, true
|
|
||||||
}
|
|
||||||
if nr, ok := cart.ParseCartId(id); ok {
|
|
||||||
return uint64(nr), true
|
|
||||||
}
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
|
|
||||||
func isValidFileId(name string) (uint64, bool) {
|
|
||||||
|
|
||||||
parts := strings.Split(name, ".")
|
|
||||||
if len(parts) > 1 && parts[1] == "events" {
|
|
||||||
idStr := parts[0]
|
|
||||||
|
|
||||||
return isValidId(idStr)
|
|
||||||
}
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
|
|
||||||
var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`)
|
|
||||||
|
|
||||||
func listCartFiles(dir string) ([]CartFileInfo, error) {
|
|
||||||
entries, err := os.ReadDir(dir)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, os.ErrNotExist) {
|
|
||||||
return []CartFileInfo{}, nil
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
out := make([]CartFileInfo, 0)
|
|
||||||
for _, e := range entries {
|
|
||||||
if e.IsDir() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
id, valid := isValidFileId(e.Name())
|
|
||||||
if !valid {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
info, err := e.Info()
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
info.Sys()
|
|
||||||
out = append(out, CartFileInfo{
|
|
||||||
ID: fmt.Sprintf("%d", id),
|
|
||||||
CartId: cart.CartId(id),
|
|
||||||
Size: info.Size(),
|
|
||||||
Modified: info.ModTime(),
|
|
||||||
System: info.Sys(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func readRawLogLines(path string) ([]json.RawMessage, error) {
|
|
||||||
fh, err := os.Open(path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer fh.Close()
|
|
||||||
lines := make([]json.RawMessage, 0, 64)
|
|
||||||
s := bufio.NewScanner(fh)
|
|
||||||
// increase buffer to handle larger JSON lines
|
|
||||||
buf := make([]byte, 0, 1024*1024)
|
|
||||||
s.Buffer(buf, 1024*1024)
|
|
||||||
for s.Scan() {
|
|
||||||
line := s.Bytes()
|
|
||||||
if line == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
lines = append(lines, line)
|
|
||||||
}
|
|
||||||
if err := s.Err(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return lines, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func writeJSON(w http.ResponseWriter, status int, v any) {
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
w.Header().Set("Cache-Control", "no-cache")
|
|
||||||
w.WriteHeader(status)
|
|
||||||
_ = json.NewEncoder(w).Encode(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *FileServer) CartsHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
list, err := listCartFiles(fs.dataDir)
|
|
||||||
if err != nil {
|
|
||||||
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// sort by modified desc
|
|
||||||
sort.Slice(list, func(i, j int) bool { return list[i].Modified.After(list[j].Modified) })
|
|
||||||
|
|
||||||
writeJSON(w, http.StatusOK, map[string]any{
|
|
||||||
"count": len(list),
|
|
||||||
"carts": list,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
type JsonError struct {
|
|
||||||
Error string `json:"error"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
idStr := r.PathValue("id")
|
|
||||||
if idStr == "" {
|
|
||||||
writeJSON(w, http.StatusBadRequest, JsonError{Error: "missing id"})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
id, ok := isValidId(idStr)
|
|
||||||
if !ok {
|
|
||||||
writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid id"})
|
|
||||||
}
|
|
||||||
// reconstruct state from event log if present
|
|
||||||
grain := cart.NewCartGrain(id, time.Now())
|
|
||||||
|
|
||||||
err := fs.storage.LoadEvents(id, grain)
|
|
||||||
if err != nil {
|
|
||||||
writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id))
|
|
||||||
info, err := os.Stat(path)
|
|
||||||
if err != nil && errors.Is(err, os.ErrNotExist) {
|
|
||||||
writeJSON(w, http.StatusNotFound, JsonError{Error: "cart not found"})
|
|
||||||
return
|
|
||||||
} else if err != nil {
|
|
||||||
writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
lines, err := readRawLogLines(path)
|
|
||||||
if err != nil {
|
|
||||||
writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
writeJSON(w, http.StatusOK, map[string]any{
|
|
||||||
"id": id,
|
|
||||||
"cartId": cart.CartId(id).String(),
|
|
||||||
"state": grain,
|
|
||||||
"rawLog": lines,
|
|
||||||
"meta": map[string]any{
|
|
||||||
"size": info.Size(),
|
|
||||||
"modified": info.ModTime(),
|
|
||||||
"path": path,
|
|
||||||
"system": info.Sys(),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
@@ -1,248 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"crypto/sha1"
|
|
||||||
"encoding/base64"
|
|
||||||
"encoding/binary"
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Hub manages websocket clients and broadcasts messages to them.
|
|
||||||
type Hub struct {
|
|
||||||
register chan *Client
|
|
||||||
unregister chan *Client
|
|
||||||
broadcast chan []byte
|
|
||||||
clients map[*Client]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client represents a single websocket client connection.
|
|
||||||
type Client struct {
|
|
||||||
hub *Hub
|
|
||||||
conn net.Conn
|
|
||||||
send chan []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewHub constructs a new Hub instance.
|
|
||||||
func NewHub() *Hub {
|
|
||||||
return &Hub{
|
|
||||||
register: make(chan *Client),
|
|
||||||
unregister: make(chan *Client),
|
|
||||||
broadcast: make(chan []byte, 1024),
|
|
||||||
clients: make(map[*Client]bool),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run starts the hub event loop.
|
|
||||||
func (h *Hub) Run() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case c := <-h.register:
|
|
||||||
h.clients[c] = true
|
|
||||||
case c := <-h.unregister:
|
|
||||||
if _, ok := h.clients[c]; ok {
|
|
||||||
delete(h.clients, c)
|
|
||||||
close(c.send)
|
|
||||||
_ = c.conn.Close()
|
|
||||||
}
|
|
||||||
case msg := <-h.broadcast:
|
|
||||||
for c := range h.clients {
|
|
||||||
select {
|
|
||||||
case c.send <- msg:
|
|
||||||
default:
|
|
||||||
// Client is slow or dead; drop it.
|
|
||||||
delete(h.clients, c)
|
|
||||||
close(c.send)
|
|
||||||
_ = c.conn.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// computeAccept computes the Sec-WebSocket-Accept header value.
|
|
||||||
func computeAccept(key string) string {
|
|
||||||
const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
|
|
||||||
h := sha1.New()
|
|
||||||
h.Write([]byte(key + magic))
|
|
||||||
return base64.StdEncoding.EncodeToString(h.Sum(nil))
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServeWS upgrades the HTTP request to a WebSocket connection and registers a client.
|
|
||||||
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if !strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") || strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
|
|
||||||
http.Error(w, "upgrade required", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
key := r.Header.Get("Sec-WebSocket-Key")
|
|
||||||
if key == "" {
|
|
||||||
http.Error(w, "missing Sec-WebSocket-Key", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
accept := computeAccept(key)
|
|
||||||
|
|
||||||
hj, ok := w.(http.Hijacker)
|
|
||||||
if !ok {
|
|
||||||
http.Error(w, "websocket not supported", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
conn, buf, err := hj.Hijack()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write the upgrade response
|
|
||||||
response := "HTTP/1.1 101 Switching Protocols\r\n" +
|
|
||||||
"Upgrade: websocket\r\n" +
|
|
||||||
"Connection: Upgrade\r\n" +
|
|
||||||
"Sec-WebSocket-Accept: " + accept + "\r\n" +
|
|
||||||
"\r\n"
|
|
||||||
if _, err := buf.WriteString(response); err != nil {
|
|
||||||
_ = conn.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := buf.Flush(); err != nil {
|
|
||||||
_ = conn.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
client := &Client{
|
|
||||||
hub: h,
|
|
||||||
conn: conn,
|
|
||||||
send: make(chan []byte, 256),
|
|
||||||
}
|
|
||||||
h.register <- client
|
|
||||||
go client.writePump()
|
|
||||||
go client.readPump()
|
|
||||||
}
|
|
||||||
|
|
||||||
// writeWSFrame writes a single WebSocket frame to the writer.
|
|
||||||
func writeWSFrame(w io.Writer, opcode byte, payload []byte) error {
|
|
||||||
// FIN set, opcode as provided
|
|
||||||
header := []byte{0x80 | (opcode & 0x0F)}
|
|
||||||
l := len(payload)
|
|
||||||
switch {
|
|
||||||
case l < 126:
|
|
||||||
header = append(header, byte(l))
|
|
||||||
case l <= 65535:
|
|
||||||
ext := make([]byte, 2)
|
|
||||||
binary.BigEndian.PutUint16(ext, uint16(l))
|
|
||||||
header = append(header, 126)
|
|
||||||
header = append(header, ext...)
|
|
||||||
default:
|
|
||||||
ext := make([]byte, 8)
|
|
||||||
binary.BigEndian.PutUint64(ext, uint64(l))
|
|
||||||
header = append(header, 127)
|
|
||||||
header = append(header, ext...)
|
|
||||||
}
|
|
||||||
if _, err := w.Write(header); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if l > 0 {
|
|
||||||
if _, err := w.Write(payload); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// readPump handles control frames from the client and discards other incoming frames.
|
|
||||||
// This server is broadcast-only to clients.
|
|
||||||
func (c *Client) readPump() {
|
|
||||||
defer func() {
|
|
||||||
c.hub.unregister <- c
|
|
||||||
}()
|
|
||||||
reader := bufio.NewReader(c.conn)
|
|
||||||
for {
|
|
||||||
// Read first two bytes
|
|
||||||
b1, err := reader.ReadByte()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
b2, err := reader.ReadByte()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
opcode := b1 & 0x0F
|
|
||||||
masked := (b2 & 0x80) != 0
|
|
||||||
length := int64(b2 & 0x7F)
|
|
||||||
if length == 126 {
|
|
||||||
ext := make([]byte, 2)
|
|
||||||
if _, err := io.ReadFull(reader, ext); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
length = int64(binary.BigEndian.Uint16(ext))
|
|
||||||
} else if length == 127 {
|
|
||||||
ext := make([]byte, 8)
|
|
||||||
if _, err := io.ReadFull(reader, ext); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
length = int64(binary.BigEndian.Uint64(ext))
|
|
||||||
}
|
|
||||||
var maskKey [4]byte
|
|
||||||
if masked {
|
|
||||||
if _, err := io.ReadFull(reader, maskKey[:]); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle Ping -> Pong
|
|
||||||
if opcode == 0x9 && length <= 125 {
|
|
||||||
payload := make([]byte, length)
|
|
||||||
if _, err := io.ReadFull(reader, payload); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Unmask if masked
|
|
||||||
if masked {
|
|
||||||
for i := int64(0); i < length; i++ {
|
|
||||||
payload[i] ^= maskKey[i%4]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ = writeWSFrame(c.conn, 0xA, payload) // best-effort pong
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close frame
|
|
||||||
if opcode == 0x8 {
|
|
||||||
// Drain payload if any, then exit
|
|
||||||
if _, err := io.CopyN(io.Discard, reader, length); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// For other frames, just discard payload
|
|
||||||
if _, err := io.CopyN(io.Discard, reader, length); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// writePump sends queued messages to the client and pings periodically to keep the connection alive.
|
|
||||||
func (c *Client) writePump() {
|
|
||||||
ticker := time.NewTicker(30 * time.Second)
|
|
||||||
defer func() {
|
|
||||||
ticker.Stop()
|
|
||||||
_ = c.conn.Close()
|
|
||||||
}()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case msg, ok := <-c.send:
|
|
||||||
if !ok {
|
|
||||||
// try to send close frame
|
|
||||||
_ = writeWSFrame(c.conn, 0x8, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := writeWSFrame(c.conn, 0x1, msg); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-ticker.C:
|
|
||||||
// Send a ping to keep connections alive behind proxies
|
|
||||||
_ = writeWSFrame(c.conn, 0x9, []byte("ping"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,151 +1,5 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"log"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
actor "git.tornberg.me/go-cart-actor/pkg/actor"
|
|
||||||
"git.tornberg.me/go-cart-actor/pkg/cart"
|
|
||||||
"github.com/matst80/slask-finder/pkg/messaging"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
type CartFileInfo struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
CartId cart.CartId `json:"cartId"`
|
|
||||||
Size int64 `json:"size"`
|
|
||||||
Modified time.Time `json:"modified"`
|
|
||||||
System any `json:"system"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func envOrDefault(key, def string) string {
|
|
||||||
if v := os.Getenv(key); v != "" {
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
return def
|
|
||||||
}
|
|
||||||
|
|
||||||
var globalDisk *actor.DiskStorage[cart.CartGrain]
|
|
||||||
|
|
||||||
func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error {
|
|
||||||
ch, err := conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
_ = conn.Close()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
msgs, err := messaging.DeclareBindAndConsume(ch, "cart", "mutation")
|
|
||||||
if err != nil {
|
|
||||||
_ = ch.Close()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer ch.Close()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case m, ok := <-msgs:
|
|
||||||
if !ok {
|
|
||||||
log.Fatalf("connection closed")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Log and broadcast to all websocket clients
|
|
||||||
log.Printf("mutation event: %s", string(m.Body))
|
|
||||||
|
|
||||||
if hub != nil {
|
|
||||||
select {
|
|
||||||
case hub.broadcast <- m.Body:
|
|
||||||
default:
|
|
||||||
// if hub queue is full, drop to avoid blocking
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := m.Ack(false); err != nil {
|
|
||||||
log.Printf("error acknowledging message: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
dataDir := envOrDefault("DATA_DIR", "data")
|
// Your code here
|
||||||
addr := envOrDefault("ADDR", ":8080")
|
|
||||||
amqpURL := os.Getenv("AMQP_URL")
|
|
||||||
|
|
||||||
_ = os.MkdirAll(dataDir, 0755)
|
|
||||||
|
|
||||||
reg := cart.NewCartMultationRegistry()
|
|
||||||
globalDisk = actor.NewDiskStorage[cart.CartGrain](dataDir, reg)
|
|
||||||
|
|
||||||
fs := NewFileServer(dataDir)
|
|
||||||
|
|
||||||
hub := NewHub()
|
|
||||||
go hub.Run()
|
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
|
||||||
mux.HandleFunc("GET /carts", fs.CartsHandler)
|
|
||||||
mux.HandleFunc("GET /cart/{id}", fs.CartHandler)
|
|
||||||
mux.HandleFunc("/ws", hub.ServeWS)
|
|
||||||
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
_, _ = w.Write([]byte("ok"))
|
|
||||||
})
|
|
||||||
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
_, _ = w.Write([]byte("ok"))
|
|
||||||
})
|
|
||||||
mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
_, _ = w.Write([]byte("ok"))
|
|
||||||
})
|
|
||||||
|
|
||||||
// Global CORS middleware allowing all origins and handling preflight
|
|
||||||
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
||||||
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, PATCH, DELETE, OPTIONS")
|
|
||||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With")
|
|
||||||
w.Header().Set("Access-Control-Expose-Headers", "*")
|
|
||||||
if r.Method == http.MethodOptions {
|
|
||||||
w.WriteHeader(http.StatusNoContent)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
mux.ServeHTTP(w, r)
|
|
||||||
})
|
|
||||||
|
|
||||||
srv := &http.Server{
|
|
||||||
Addr: addr,
|
|
||||||
Handler: handler,
|
|
||||||
ReadTimeout: 10 * time.Second,
|
|
||||||
WriteTimeout: 30 * time.Second,
|
|
||||||
IdleTimeout: 60 * time.Second,
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
if amqpURL != "" {
|
|
||||||
conn, err := amqp.Dial(amqpURL)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed to connect to RabbitMQ: %w", err)
|
|
||||||
}
|
|
||||||
if err := startMutationConsumer(ctx, conn, hub); err != nil {
|
|
||||||
log.Printf("AMQP listener disabled: %v", err)
|
|
||||||
} else {
|
|
||||||
log.Printf("AMQP listener connected")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("backoffice HTTP listening on %s (dataDir=%s)", addr, dataDir)
|
|
||||||
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
||||||
log.Fatalf("http server error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// server stopped
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,48 +9,42 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type AmqpOrderHandler struct {
|
type AmqpOrderHandler struct {
|
||||||
conn *amqp.Connection
|
Url string
|
||||||
|
Connection *amqp.Connection
|
||||||
|
Channel *amqp.Channel
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAmqpOrderHandler(conn *amqp.Connection) *AmqpOrderHandler {
|
func (h *AmqpOrderHandler) Connect() error {
|
||||||
return &AmqpOrderHandler{
|
conn, err := amqp.Dial(h.Url)
|
||||||
conn: conn,
|
if err != nil {
|
||||||
}
|
return fmt.Errorf("failed to connect to RabbitMQ: %w", err)
|
||||||
}
|
}
|
||||||
|
h.Connection = conn
|
||||||
|
|
||||||
func (h *AmqpOrderHandler) DefineTopics() error {
|
ch, err := conn.Channel()
|
||||||
ch, err := h.conn.Channel()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open a channel: %w", err)
|
return fmt.Errorf("failed to open a channel: %w", err)
|
||||||
}
|
}
|
||||||
defer ch.Close()
|
h.Channel = ch
|
||||||
|
|
||||||
err = ch.ExchangeDeclare(
|
|
||||||
"orders", // name
|
|
||||||
"direct", // type
|
|
||||||
true, // durable
|
|
||||||
false, // auto-deleted
|
|
||||||
false, // internal
|
|
||||||
false, // no-wait
|
|
||||||
nil, // arguments
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to declare an exchange: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *AmqpOrderHandler) OrderCompleted(body []byte) error {
|
func (h *AmqpOrderHandler) Close() error {
|
||||||
ch, err := h.conn.Channel()
|
if h.Channel != nil {
|
||||||
if err != nil {
|
h.Channel.Close()
|
||||||
return fmt.Errorf("failed to open a channel: %w", err)
|
|
||||||
}
|
}
|
||||||
defer ch.Close()
|
if h.Connection != nil {
|
||||||
|
return h.Connection.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *AmqpOrderHandler) OrderCompleted(body []byte) error {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
return ch.PublishWithContext(ctx,
|
err := h.Channel.PublishWithContext(ctx,
|
||||||
"orders", // exchange
|
"orders", // exchange
|
||||||
"new", // routing key
|
"new", // routing key
|
||||||
false, // mandatory
|
false, // mandatory
|
||||||
@@ -59,5 +53,9 @@ func (h *AmqpOrderHandler) OrderCompleted(body []byte) error {
|
|||||||
ContentType: "application/json",
|
ContentType: "application/json",
|
||||||
Body: body,
|
Body: body,
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to publish a message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -137,22 +138,6 @@ func (v *Voucher) AppliesTo(cart *CartGrain) ([]*CartItem, bool) {
|
|||||||
return cart.Items, true
|
return cart.Items, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCartGrain(id uint64, ts time.Time) *CartGrain {
|
|
||||||
return &CartGrain{
|
|
||||||
lastItemId: 0,
|
|
||||||
lastDeliveryId: 0,
|
|
||||||
lastVoucherId: 0,
|
|
||||||
lastAccess: ts,
|
|
||||||
lastChange: ts,
|
|
||||||
TotalDiscount: NewPrice(),
|
|
||||||
Vouchers: []*Voucher{},
|
|
||||||
Deliveries: []*CartDelivery{},
|
|
||||||
Id: CartId(id),
|
|
||||||
Items: []*CartItem{},
|
|
||||||
TotalPrice: NewPrice(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CartGrain) GetId() uint64 {
|
func (c *CartGrain) GetId() uint64 {
|
||||||
return uint64(c.Id)
|
return uint64(c.Id)
|
||||||
}
|
}
|
||||||
@@ -170,6 +155,22 @@ func (c *CartGrain) GetCurrentState() (*CartGrain, error) {
|
|||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getInt(data float64, ok bool) (int, error) {
|
||||||
|
if !ok {
|
||||||
|
return 0, fmt.Errorf("invalid type")
|
||||||
|
}
|
||||||
|
return int(data), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// func (c *CartGrain) AddItem(sku string, qty int, country string, storeId *string) (*CartGrain, error) {
|
||||||
|
// cartItem, err := getItemData(sku, qty, country)
|
||||||
|
// if err != nil {
|
||||||
|
// return nil, err
|
||||||
|
// }
|
||||||
|
// cartItem.StoreId = storeId
|
||||||
|
// return c.Apply(cartItem, false)
|
||||||
|
// }
|
||||||
|
|
||||||
func (c *CartGrain) GetState() ([]byte, error) {
|
func (c *CartGrain) GetState() ([]byte, error) {
|
||||||
return json.Marshal(c)
|
return json.Marshal(c)
|
||||||
}
|
}
|
||||||
@@ -259,10 +260,6 @@ func (c *CartGrain) UpdateTotals() {
|
|||||||
for _, voucher := range c.Vouchers {
|
for _, voucher := range c.Vouchers {
|
||||||
if _, ok := voucher.AppliesTo(c); ok {
|
if _, ok := voucher.AppliesTo(c); ok {
|
||||||
value := NewPriceFromIncVat(voucher.Value, 25)
|
value := NewPriceFromIncVat(voucher.Value, 25)
|
||||||
if c.TotalPrice.IncVat <= value.IncVat {
|
|
||||||
// don't apply discounts to more than the total price
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
c.TotalDiscount.Add(*value)
|
c.TotalDiscount.Add(*value)
|
||||||
c.TotalPrice.Subtract(*value)
|
c.TotalPrice.Subtract(*value)
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
@@ -3,8 +3,6 @@ package main
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.tornberg.me/go-cart-actor/pkg/cart"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// CheckoutMeta carries the external / URL metadata required to build a
|
// CheckoutMeta carries the external / URL metadata required to build a
|
||||||
@@ -35,7 +33,7 @@ type CheckoutMeta struct {
|
|||||||
//
|
//
|
||||||
// If you later need to support different tax rates per line, you can extend
|
// If you later need to support different tax rates per line, you can extend
|
||||||
// CartItem / Delivery to expose that data and propagate it here.
|
// CartItem / Delivery to expose that data and propagate it here.
|
||||||
func BuildCheckoutOrderPayload(grain *cart.CartGrain, meta *CheckoutMeta) ([]byte, *CheckoutOrder, error) {
|
func BuildCheckoutOrderPayload(grain *CartGrain, meta *CheckoutMeta) ([]byte, *CheckoutOrder, error) {
|
||||||
if grain == nil {
|
if grain == nil {
|
||||||
return nil, nil, fmt.Errorf("nil grain")
|
return nil, nil, fmt.Errorf("nil grain")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.tornberg.me/go-cart-actor/pkg/actor"
|
"git.tornberg.me/go-cart-actor/pkg/actor"
|
||||||
"git.tornberg.me/go-cart-actor/pkg/cart"
|
|
||||||
"git.tornberg.me/go-cart-actor/pkg/discovery"
|
"git.tornberg.me/go-cart-actor/pkg/discovery"
|
||||||
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
||||||
"git.tornberg.me/go-cart-actor/pkg/proxy"
|
"git.tornberg.me/go-cart-actor/pkg/proxy"
|
||||||
@@ -21,7 +20,6 @@ import (
|
|||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
@@ -47,7 +45,7 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type App struct {
|
type App struct {
|
||||||
pool *actor.SimpleGrainPool[cart.CartGrain]
|
pool *actor.SimpleGrainPool[CartGrain]
|
||||||
}
|
}
|
||||||
|
|
||||||
var podIp = os.Getenv("POD_IP")
|
var podIp = os.Getenv("POD_IP")
|
||||||
@@ -99,24 +97,63 @@ type MutationContext struct {
|
|||||||
VoucherService voucher.Service
|
VoucherService voucher.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
type CartChangeEvent struct {
|
|
||||||
CartId cart.CartId `json:"cartId"`
|
|
||||||
Mutations []actor.ApplyResult `json:"mutations"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
controlPlaneConfig := actor.DefaultServerConfig()
|
controlPlaneConfig := actor.DefaultServerConfig()
|
||||||
|
|
||||||
reg := cart.NewCartMultationRegistry()
|
reg := actor.NewMutationRegistry()
|
||||||
diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg)
|
reg.RegisterMutations(
|
||||||
poolConfig := actor.GrainPoolConfig[cart.CartGrain]{
|
actor.NewMutation(AddItem, func() *messages.AddItem {
|
||||||
|
return &messages.AddItem{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(ChangeQuantity, func() *messages.ChangeQuantity {
|
||||||
|
return &messages.ChangeQuantity{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(RemoveItem, func() *messages.RemoveItem {
|
||||||
|
return &messages.RemoveItem{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(InitializeCheckout, func() *messages.InitializeCheckout {
|
||||||
|
return &messages.InitializeCheckout{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(OrderCreated, func() *messages.OrderCreated {
|
||||||
|
return &messages.OrderCreated{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(RemoveDelivery, func() *messages.RemoveDelivery {
|
||||||
|
return &messages.RemoveDelivery{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(SetDelivery, func() *messages.SetDelivery {
|
||||||
|
return &messages.SetDelivery{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(SetPickupPoint, func() *messages.SetPickupPoint {
|
||||||
|
return &messages.SetPickupPoint{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(ClearCart, func() *messages.ClearCartRequest {
|
||||||
|
return &messages.ClearCartRequest{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(AddVoucher, func() *messages.AddVoucher {
|
||||||
|
return &messages.AddVoucher{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(RemoveVoucher, func() *messages.RemoveVoucher {
|
||||||
|
return &messages.RemoveVoucher{}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
diskStorage := actor.NewDiskStorage[CartGrain]("data", reg)
|
||||||
|
poolConfig := actor.GrainPoolConfig[CartGrain]{
|
||||||
MutationRegistry: reg,
|
MutationRegistry: reg,
|
||||||
Storage: diskStorage,
|
Storage: diskStorage,
|
||||||
Spawn: func(id uint64) (actor.Grain[cart.CartGrain], error) {
|
Spawn: func(id uint64) (actor.Grain[CartGrain], error) {
|
||||||
grainSpawns.Inc()
|
grainSpawns.Inc()
|
||||||
ret := cart.NewCartGrain(id, time.Now())
|
ret := &CartGrain{
|
||||||
|
lastItemId: 0,
|
||||||
|
lastDeliveryId: 0,
|
||||||
|
Deliveries: []*CartDelivery{},
|
||||||
|
Id: CartId(id),
|
||||||
|
Items: []*CartItem{},
|
||||||
|
TotalPrice: NewPrice(),
|
||||||
|
}
|
||||||
// Set baseline lastChange at spawn; replay may update it to last event timestamp.
|
// Set baseline lastChange at spawn; replay may update it to last event timestamp.
|
||||||
|
ret.lastChange = time.Now()
|
||||||
|
ret.lastAccess = time.Now()
|
||||||
|
|
||||||
err := diskStorage.LoadEvents(id, ret)
|
err := diskStorage.LoadEvents(id, ret)
|
||||||
|
|
||||||
@@ -138,27 +175,13 @@ func main() {
|
|||||||
pool: pool,
|
pool: pool,
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := amqp.Dial(amqpUrl)
|
grpcSrv, err := actor.NewControlServer[*CartGrain](controlPlaneConfig, pool)
|
||||||
if err != nil {
|
|
||||||
fmt.Errorf("failed to connect to RabbitMQ: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) {
|
|
||||||
return &CartChangeEvent{
|
|
||||||
CartId: cart.CartId(id),
|
|
||||||
Mutations: msg,
|
|
||||||
}, nil
|
|
||||||
})
|
|
||||||
amqpListener.DefineTopics()
|
|
||||||
pool.AddListener(amqpListener)
|
|
||||||
|
|
||||||
grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Error starting control plane gRPC server: %v\n", err)
|
log.Fatalf("Error starting control plane gRPC server: %v\n", err)
|
||||||
}
|
}
|
||||||
defer grpcSrv.GracefulStop()
|
defer grpcSrv.GracefulStop()
|
||||||
|
|
||||||
// go diskStorage.SaveLoop(10 * time.Second)
|
go diskStorage.SaveLoop(10 * time.Second)
|
||||||
|
|
||||||
go func(hw discovery.Discovery) {
|
go func(hw discovery.Discovery) {
|
||||||
if hw == nil {
|
if hw == nil {
|
||||||
@@ -188,12 +211,12 @@ func main() {
|
|||||||
}
|
}
|
||||||
}(GetDiscovery())
|
}(GetDiscovery())
|
||||||
|
|
||||||
orderHandler := NewAmqpOrderHandler(conn)
|
orderHandler := &AmqpOrderHandler{
|
||||||
orderHandler.DefineTopics()
|
Url: amqpUrl,
|
||||||
|
}
|
||||||
klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
|
klarnaClient := NewKlarnaClient(KlarnaPlaygroundUrl, os.Getenv("KLARNA_API_USERNAME"), os.Getenv("KLARNA_API_PASSWORD"))
|
||||||
|
|
||||||
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient)
|
syncedServer := NewPoolServer(pool, fmt.Sprintf("%s, %s", name, podIp), klarnaClient)
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve()))
|
mux.Handle("/cart/", http.StripPrefix("/cart", syncedServer.Serve()))
|
||||||
// only for local
|
// only for local
|
||||||
@@ -249,14 +272,14 @@ func main() {
|
|||||||
w.Write([]byte("no cart id to checkout is empty"))
|
w.Write([]byte("no cart id to checkout is empty"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
parsed, ok := cart.ParseCartId(cookie.Value)
|
parsed, ok := ParseCartId(cookie.Value)
|
||||||
if !ok {
|
if !ok {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
w.Write([]byte("invalid cart id format"))
|
w.Write([]byte("invalid cart id format"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cartId := parsed
|
cartId := parsed
|
||||||
syncedServer.ProxyHandler(func(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
|
syncedServer.ProxyHandler(func(w http.ResponseWriter, r *http.Request, cartId CartId) error {
|
||||||
order, err = syncedServer.CreateOrUpdateCheckout(r.Host, cartId)
|
order, err = syncedServer.CreateOrUpdateCheckout(r.Host, cartId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -410,7 +433,7 @@ func triggerOrderCompleted(syncedServer *PoolServer, order *CheckoutOrder) error
|
|||||||
OrderId: order.ID,
|
OrderId: order.ID,
|
||||||
Status: order.Status,
|
Status: order.Status,
|
||||||
}
|
}
|
||||||
cid, ok := cart.ParseCartId(order.MerchantReference1)
|
cid, ok := ParseCartId(order.MerchantReference1)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1)
|
return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1)
|
||||||
}
|
}
|
||||||
@@ -424,7 +447,11 @@ func confirmOrder(order *CheckoutOrder, orderHandler *AmqpOrderHandler) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
err = orderHandler.Connect()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer orderHandler.Close()
|
||||||
err = orderHandler.OrderCompleted(orderToSend)
|
err = orderHandler.OrderCompleted(orderToSend)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"slices"
|
"slices"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -11,19 +11,18 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.tornberg.me/go-cart-actor/pkg/actor"
|
"git.tornberg.me/go-cart-actor/pkg/actor"
|
||||||
"git.tornberg.me/go-cart-actor/pkg/cart"
|
|
||||||
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
||||||
"git.tornberg.me/go-cart-actor/pkg/voucher"
|
"git.tornberg.me/go-cart-actor/pkg/voucher"
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PoolServer struct {
|
type PoolServer struct {
|
||||||
actor.GrainPool[*cart.CartGrain]
|
actor.GrainPool[*CartGrain]
|
||||||
pod_name string
|
pod_name string
|
||||||
klarnaClient *KlarnaClient
|
klarnaClient *KlarnaClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient) *PoolServer {
|
func NewPoolServer(pool actor.GrainPool[*CartGrain], pod_name string, klarnaClient *KlarnaClient) *PoolServer {
|
||||||
return &PoolServer{
|
return &PoolServer{
|
||||||
GrainPool: pool,
|
GrainPool: pool,
|
||||||
pod_name: pod_name,
|
pod_name: pod_name,
|
||||||
@@ -31,11 +30,11 @@ func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarn
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) ApplyLocal(id cart.CartId, mutation ...proto.Message) (*actor.MutationResult[*cart.CartGrain], error) {
|
func (s *PoolServer) ApplyLocal(id CartId, mutation ...proto.Message) (*actor.MutationResult[*CartGrain], error) {
|
||||||
return s.Apply(uint64(id), mutation...)
|
return s.Apply(uint64(id), mutation...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) GetCartHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
|
func (s *PoolServer) GetCartHandler(w http.ResponseWriter, r *http.Request, id CartId) error {
|
||||||
grain, err := s.Get(uint64(id))
|
grain, err := s.Get(uint64(id))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -44,7 +43,7 @@ func (s *PoolServer) GetCartHandler(w http.ResponseWriter, r *http.Request, id c
|
|||||||
return s.WriteResult(w, grain)
|
return s.WriteResult(w, grain)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) AddSkuToCartHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
|
func (s *PoolServer) AddSkuToCartHandler(w http.ResponseWriter, r *http.Request, id CartId) error {
|
||||||
sku := r.PathValue("sku")
|
sku := r.PathValue("sku")
|
||||||
msg, err := GetItemAddMessage(sku, 1, getCountryFromHost(r.Host), nil)
|
msg, err := GetItemAddMessage(sku, 1, getCountryFromHost(r.Host), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -74,7 +73,7 @@ func (s *PoolServer) WriteResult(w http.ResponseWriter, result any) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) DeleteItemHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
|
func (s *PoolServer) DeleteItemHandler(w http.ResponseWriter, r *http.Request, id CartId) error {
|
||||||
|
|
||||||
itemIdString := r.PathValue("itemId")
|
itemIdString := r.PathValue("itemId")
|
||||||
itemId, err := strconv.ParseInt(itemIdString, 10, 64)
|
itemId, err := strconv.ParseInt(itemIdString, 10, 64)
|
||||||
@@ -94,7 +93,7 @@ type SetDeliveryRequest struct {
|
|||||||
PickupPoint *messages.PickupPoint `json:"pickupPoint,omitempty"`
|
PickupPoint *messages.PickupPoint `json:"pickupPoint,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) SetDeliveryHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
|
func (s *PoolServer) SetDeliveryHandler(w http.ResponseWriter, r *http.Request, id CartId) error {
|
||||||
|
|
||||||
delivery := SetDeliveryRequest{}
|
delivery := SetDeliveryRequest{}
|
||||||
err := json.NewDecoder(r.Body).Decode(&delivery)
|
err := json.NewDecoder(r.Body).Decode(&delivery)
|
||||||
@@ -112,7 +111,7 @@ func (s *PoolServer) SetDeliveryHandler(w http.ResponseWriter, r *http.Request,
|
|||||||
return s.WriteResult(w, data)
|
return s.WriteResult(w, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) SetPickupPointHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
|
func (s *PoolServer) SetPickupPointHandler(w http.ResponseWriter, r *http.Request, id CartId) error {
|
||||||
|
|
||||||
deliveryIdString := r.PathValue("deliveryId")
|
deliveryIdString := r.PathValue("deliveryId")
|
||||||
deliveryId, err := strconv.ParseInt(deliveryIdString, 10, 64)
|
deliveryId, err := strconv.ParseInt(deliveryIdString, 10, 64)
|
||||||
@@ -139,7 +138,7 @@ func (s *PoolServer) SetPickupPointHandler(w http.ResponseWriter, r *http.Reques
|
|||||||
return s.WriteResult(w, reply)
|
return s.WriteResult(w, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) RemoveDeliveryHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
|
func (s *PoolServer) RemoveDeliveryHandler(w http.ResponseWriter, r *http.Request, id CartId) error {
|
||||||
|
|
||||||
deliveryIdString := r.PathValue("deliveryId")
|
deliveryIdString := r.PathValue("deliveryId")
|
||||||
deliveryId, err := strconv.Atoi(deliveryIdString)
|
deliveryId, err := strconv.Atoi(deliveryIdString)
|
||||||
@@ -153,7 +152,7 @@ func (s *PoolServer) RemoveDeliveryHandler(w http.ResponseWriter, r *http.Reques
|
|||||||
return s.WriteResult(w, reply)
|
return s.WriteResult(w, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) QuantityChangeHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
|
func (s *PoolServer) QuantityChangeHandler(w http.ResponseWriter, r *http.Request, id CartId) error {
|
||||||
changeQuantity := messages.ChangeQuantity{}
|
changeQuantity := messages.ChangeQuantity{}
|
||||||
err := json.NewDecoder(r.Body).Decode(&changeQuantity)
|
err := json.NewDecoder(r.Body).Decode(&changeQuantity)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -198,7 +197,7 @@ func getMultipleAddMessages(items []Item, country string) []proto.Message {
|
|||||||
return msgs
|
return msgs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) SetCartItemsHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
|
func (s *PoolServer) SetCartItemsHandler(w http.ResponseWriter, r *http.Request, id CartId) error {
|
||||||
setCartItems := SetCartItems{}
|
setCartItems := SetCartItems{}
|
||||||
err := json.NewDecoder(r.Body).Decode(&setCartItems)
|
err := json.NewDecoder(r.Body).Decode(&setCartItems)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -216,7 +215,7 @@ func (s *PoolServer) SetCartItemsHandler(w http.ResponseWriter, r *http.Request,
|
|||||||
return s.WriteResult(w, reply)
|
return s.WriteResult(w, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) AddMultipleItemHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
|
func (s *PoolServer) AddMultipleItemHandler(w http.ResponseWriter, r *http.Request, id CartId) error {
|
||||||
setCartItems := SetCartItems{}
|
setCartItems := SetCartItems{}
|
||||||
err := json.NewDecoder(r.Body).Decode(&setCartItems)
|
err := json.NewDecoder(r.Body).Decode(&setCartItems)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -237,7 +236,7 @@ type AddRequest struct {
|
|||||||
StoreId *string `json:"storeId"`
|
StoreId *string `json:"storeId"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) AddSkuRequestHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error {
|
func (s *PoolServer) AddSkuRequestHandler(w http.ResponseWriter, r *http.Request, id CartId) error {
|
||||||
addRequest := AddRequest{Quantity: 1}
|
addRequest := AddRequest{Quantity: 1}
|
||||||
err := json.NewDecoder(r.Body).Decode(&addRequest)
|
err := json.NewDecoder(r.Body).Decode(&addRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -288,7 +287,7 @@ func getLocale(country string) string {
|
|||||||
return "sv-se"
|
return "sv-se"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) CreateOrUpdateCheckout(host string, id cart.CartId) (*CheckoutOrder, error) {
|
func (s *PoolServer) CreateOrUpdateCheckout(host string, id CartId) (*CheckoutOrder, error) {
|
||||||
country := getCountryFromHost(host)
|
country := getCountryFromHost(host)
|
||||||
meta := &CheckoutMeta{
|
meta := &CheckoutMeta{
|
||||||
Terms: fmt.Sprintf("https://%s/terms", host),
|
Terms: fmt.Sprintf("https://%s/terms", host),
|
||||||
@@ -320,7 +319,7 @@ func (s *PoolServer) CreateOrUpdateCheckout(host string, id cart.CartId) (*Check
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id cart.CartId) (*actor.MutationResult[*cart.CartGrain], error) {
|
func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id CartId) (*actor.MutationResult[*CartGrain], error) {
|
||||||
// Persist initialization state via mutation (best-effort)
|
// Persist initialization state via mutation (best-effort)
|
||||||
return s.ApplyLocal(id, &messages.InitializeCheckout{
|
return s.ApplyLocal(id, &messages.InitializeCheckout{
|
||||||
OrderId: klarnaOrder.ID,
|
OrderId: klarnaOrder.ID,
|
||||||
@@ -342,12 +341,12 @@ func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id cart.Ca
|
|||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
|
|
||||||
func CookieCartIdHandler(fn func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) {
|
func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
var id cart.CartId
|
var id CartId
|
||||||
cookie, err := r.Cookie("cartid")
|
cookie, err := r.Cookie("cartid")
|
||||||
if err != nil || cookie.Value == "" {
|
if err != nil || cookie.Value == "" {
|
||||||
id = cart.MustNewCartId()
|
id = MustNewCartId()
|
||||||
http.SetCookie(w, &http.Cookie{
|
http.SetCookie(w, &http.Cookie{
|
||||||
Name: "cartid",
|
Name: "cartid",
|
||||||
Value: id.String(),
|
Value: id.String(),
|
||||||
@@ -359,9 +358,9 @@ func CookieCartIdHandler(fn func(cartId cart.CartId, w http.ResponseWriter, r *h
|
|||||||
})
|
})
|
||||||
w.Header().Set("Set-Cart-Id", id.String())
|
w.Header().Set("Set-Cart-Id", id.String())
|
||||||
} else {
|
} else {
|
||||||
parsed, ok := cart.ParseCartId(cookie.Value)
|
parsed, ok := ParseCartId(cookie.Value)
|
||||||
if !ok {
|
if !ok {
|
||||||
id = cart.MustNewCartId()
|
id = MustNewCartId()
|
||||||
http.SetCookie(w, &http.Cookie{
|
http.SetCookie(w, &http.Cookie{
|
||||||
Name: "cartid",
|
Name: "cartid",
|
||||||
Value: id.String(),
|
Value: id.String(),
|
||||||
@@ -389,7 +388,7 @@ func CookieCartIdHandler(fn func(cartId cart.CartId, w http.ResponseWriter, r *h
|
|||||||
|
|
||||||
// Removed leftover legacy block after CookieCartIdHandler (obsolete code referencing cid/legacy)
|
// Removed leftover legacy block after CookieCartIdHandler (obsolete code referencing cid/legacy)
|
||||||
|
|
||||||
func (s *PoolServer) RemoveCartCookie(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
|
func (s *PoolServer) RemoveCartCookie(w http.ResponseWriter, r *http.Request, cartId CartId) error {
|
||||||
// Clear cart cookie (breaking change: do not issue a new legacy id here)
|
// Clear cart cookie (breaking change: do not issue a new legacy id here)
|
||||||
http.SetCookie(w, &http.Cookie{
|
http.SetCookie(w, &http.Cookie{
|
||||||
Name: "cartid",
|
Name: "cartid",
|
||||||
@@ -404,17 +403,17 @@ func (s *PoolServer) RemoveCartCookie(w http.ResponseWriter, r *http.Request, ca
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CartIdHandler(fn func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) {
|
func CartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
var id cart.CartId
|
var id CartId
|
||||||
raw := r.PathValue("id")
|
raw := r.PathValue("id")
|
||||||
// If no id supplied, generate a new one
|
// If no id supplied, generate a new one
|
||||||
if raw == "" {
|
if raw == "" {
|
||||||
id := cart.MustNewCartId()
|
id := MustNewCartId()
|
||||||
w.Header().Set("Set-Cart-Id", id.String())
|
w.Header().Set("Set-Cart-Id", id.String())
|
||||||
} else {
|
} else {
|
||||||
// Parse base62 cart id
|
// Parse base62 cart id
|
||||||
if parsedId, ok := cart.ParseCartId(raw); !ok {
|
if parsedId, ok := ParseCartId(raw); !ok {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
w.Write([]byte("cart id is invalid"))
|
w.Write([]byte("cart id is invalid"))
|
||||||
return
|
return
|
||||||
@@ -432,8 +431,8 @@ func CartIdHandler(fn func(cartId cart.CartId, w http.ResponseWriter, r *http.Re
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error) func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error {
|
func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId) error) func(cartId CartId, w http.ResponseWriter, r *http.Request) error {
|
||||||
return func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error {
|
return func(cartId CartId, w http.ResponseWriter, r *http.Request) error {
|
||||||
if ownerHost, ok := s.OwnerHost(uint64(cartId)); ok {
|
if ownerHost, ok := s.OwnerHost(uint64(cartId)); ok {
|
||||||
handled, err := ownerHost.Proxy(uint64(cartId), w, r)
|
handled, err := ownerHost.Proxy(uint64(cartId), w, r)
|
||||||
if err == nil && handled {
|
if err == nil && handled {
|
||||||
@@ -450,7 +449,7 @@ type AddVoucherRequest struct {
|
|||||||
VoucherCode string `json:"code"`
|
VoucherCode string `json:"code"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) AddVoucherHandler(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
|
func (s *PoolServer) AddVoucherHandler(w http.ResponseWriter, r *http.Request, cartId CartId) error {
|
||||||
data := &AddVoucherRequest{}
|
data := &AddVoucherRequest{}
|
||||||
json.NewDecoder(r.Body).Decode(data)
|
json.NewDecoder(r.Body).Decode(data)
|
||||||
v := voucher.Service{}
|
v := voucher.Service{}
|
||||||
@@ -470,7 +469,7 @@ func (s *PoolServer) AddVoucherHandler(w http.ResponseWriter, r *http.Request, c
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PoolServer) RemoveVoucherHandler(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
|
func (s *PoolServer) RemoveVoucherHandler(w http.ResponseWriter, r *http.Request, cartId CartId) error {
|
||||||
|
|
||||||
idStr := r.PathValue("voucherId")
|
idStr := r.PathValue("voucherId")
|
||||||
id, err := strconv.ParseInt(idStr, 10, 64)
|
id, err := strconv.ParseInt(idStr, 10, 64)
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package cart
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"git.tornberg.me/go-cart-actor/pkg/cart"
|
|
||||||
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
||||||
"github.com/matst80/slask-finder/pkg/index"
|
"github.com/matst80/slask-finder/pkg/index"
|
||||||
)
|
)
|
||||||
@@ -54,19 +53,19 @@ func ToItemAddMessage(item *index.DataItem, storeId *string, qty int, country st
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
stock := cart.StockStatus(0)
|
stock := StockStatus(0)
|
||||||
centralStockValue, ok := item.GetStringFieldValue(3)
|
centralStockValue, ok := item.GetStringFieldValue(3)
|
||||||
if storeId == nil {
|
if storeId == nil {
|
||||||
if ok {
|
if ok {
|
||||||
pureNumber := strings.Replace(centralStockValue, "+", "", -1)
|
pureNumber := strings.Replace(centralStockValue, "+", "", -1)
|
||||||
if centralStock, err := strconv.ParseInt(pureNumber, 10, 64); err == nil {
|
if centralStock, err := strconv.ParseInt(pureNumber, 10, 64); err == nil {
|
||||||
stock = cart.StockStatus(centralStock)
|
stock = StockStatus(centralStock)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
storeStock, ok := item.Stock.GetStock()[*storeId]
|
storeStock, ok := item.Stock.GetStock()[*storeId]
|
||||||
if ok {
|
if ok {
|
||||||
stock = cart.StockStatus(storeStock)
|
stock = StockStatus(storeStock)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -120,10 +119,3 @@ func getTax(articleType string) int32 {
|
|||||||
return 2500
|
return 2500
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getInt(data float64, ok bool) (int, error) {
|
|
||||||
if !ok {
|
|
||||||
return 0, fmt.Errorf("invalid type")
|
|
||||||
}
|
|
||||||
return int(data), nil
|
|
||||||
}
|
|
||||||
|
|||||||
BIN
data/1.prot
Normal file
BIN
data/1.prot
Normal file
Binary file not shown.
BIN
data/4.prot
Normal file
BIN
data/4.prot
Normal file
Binary file not shown.
BIN
data/5.prot
Normal file
BIN
data/5.prot
Normal file
Binary file not shown.
BIN
data/state.gob
Normal file
BIN
data/state.gob
Normal file
Binary file not shown.
BIN
data/state.gob.bak
Normal file
BIN
data/state.gob.bak
Normal file
Binary file not shown.
@@ -9,94 +9,6 @@ type: Opaque
|
|||||||
---
|
---
|
||||||
apiVersion: apps/v1
|
apiVersion: apps/v1
|
||||||
kind: Deployment
|
kind: Deployment
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
app: cart-backoffice
|
|
||||||
arch: amd64
|
|
||||||
name: cart-backoffice-x86
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
app: cart-backoffice
|
|
||||||
arch: amd64
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
app: cart-backoffice
|
|
||||||
arch: amd64
|
|
||||||
spec:
|
|
||||||
affinity:
|
|
||||||
nodeAffinity:
|
|
||||||
requiredDuringSchedulingIgnoredDuringExecution:
|
|
||||||
nodeSelectorTerms:
|
|
||||||
- matchExpressions:
|
|
||||||
- key: kubernetes.io/arch
|
|
||||||
operator: NotIn
|
|
||||||
values:
|
|
||||||
- arm64
|
|
||||||
volumes:
|
|
||||||
- name: data
|
|
||||||
nfs:
|
|
||||||
path: /i-data/7a8af061/nfs/cart-actor
|
|
||||||
server: 10.10.1.10
|
|
||||||
imagePullSecrets:
|
|
||||||
- name: regcred
|
|
||||||
serviceAccountName: default
|
|
||||||
containers:
|
|
||||||
- image: registry.knatofs.se/go-cart-actor-amd64:latest
|
|
||||||
name: cart-actor-amd64
|
|
||||||
imagePullPolicy: Always
|
|
||||||
command: ["/go-cart-backoffice"]
|
|
||||||
lifecycle:
|
|
||||||
preStop:
|
|
||||||
exec:
|
|
||||||
command: ["sleep", "15"]
|
|
||||||
ports:
|
|
||||||
- containerPort: 8080
|
|
||||||
name: web
|
|
||||||
livenessProbe:
|
|
||||||
httpGet:
|
|
||||||
path: /livez
|
|
||||||
port: web
|
|
||||||
failureThreshold: 1
|
|
||||||
periodSeconds: 10
|
|
||||||
readinessProbe:
|
|
||||||
httpGet:
|
|
||||||
path: /readyz
|
|
||||||
port: web
|
|
||||||
failureThreshold: 2
|
|
||||||
initialDelaySeconds: 2
|
|
||||||
periodSeconds: 10
|
|
||||||
volumeMounts:
|
|
||||||
- mountPath: "/data"
|
|
||||||
name: data
|
|
||||||
resources:
|
|
||||||
limits:
|
|
||||||
memory: "768Mi"
|
|
||||||
requests:
|
|
||||||
memory: "70Mi"
|
|
||||||
cpu: "1200m"
|
|
||||||
env:
|
|
||||||
- name: TZ
|
|
||||||
value: "Europe/Stockholm"
|
|
||||||
- name: KLARNA_API_USERNAME
|
|
||||||
valueFrom:
|
|
||||||
secretKeyRef:
|
|
||||||
name: klarna-api-credentials
|
|
||||||
key: username
|
|
||||||
- name: KLARNA_API_PASSWORD
|
|
||||||
valueFrom:
|
|
||||||
secretKeyRef:
|
|
||||||
name: klarna-api-credentials
|
|
||||||
key: password
|
|
||||||
- name: AMQP_URL
|
|
||||||
value: "amqp://admin:12bananer@rabbitmq.dev:5672/"
|
|
||||||
# - name: BASE_URL
|
|
||||||
# value: "https://s10n-no.tornberg.me"
|
|
||||||
---
|
|
||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
metadata:
|
||||||
labels:
|
labels:
|
||||||
app: cart-actor
|
app: cart-actor
|
||||||
@@ -310,17 +222,6 @@ spec:
|
|||||||
- name: web
|
- name: web
|
||||||
port: 8080
|
port: 8080
|
||||||
---
|
---
|
||||||
kind: Service
|
|
||||||
apiVersion: v1
|
|
||||||
metadata:
|
|
||||||
name: cart-backoffice
|
|
||||||
spec:
|
|
||||||
selector:
|
|
||||||
app: cart-backoffice
|
|
||||||
ports:
|
|
||||||
- name: web
|
|
||||||
port: 8080
|
|
||||||
---
|
|
||||||
apiVersion: networking.k8s.io/v1
|
apiVersion: networking.k8s.io/v1
|
||||||
kind: Ingress
|
kind: Ingress
|
||||||
metadata:
|
metadata:
|
||||||
@@ -349,27 +250,3 @@ spec:
|
|||||||
name: cart-actor
|
name: cart-actor
|
||||||
port:
|
port:
|
||||||
number: 8080
|
number: 8080
|
||||||
---
|
|
||||||
apiVersion: networking.k8s.io/v1
|
|
||||||
kind: Ingress
|
|
||||||
metadata:
|
|
||||||
name: cart-backend-ingress
|
|
||||||
annotations:
|
|
||||||
cert-manager.io/cluster-issuer: letsencrypt-prod
|
|
||||||
spec:
|
|
||||||
ingressClassName: nginx
|
|
||||||
tls:
|
|
||||||
- hosts:
|
|
||||||
- slask-cart.tornberg.me
|
|
||||||
secretName: cart-backoffice-actor-tls-secret
|
|
||||||
rules:
|
|
||||||
- host: slask-cart.tornberg.me
|
|
||||||
http:
|
|
||||||
paths:
|
|
||||||
- path: /
|
|
||||||
pathType: Prefix
|
|
||||||
backend:
|
|
||||||
service:
|
|
||||||
name: cart-backoffice
|
|
||||||
port:
|
|
||||||
number: 8080
|
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ type DiskStorage[V any] struct {
|
|||||||
|
|
||||||
type LogStorage[V any] interface {
|
type LogStorage[V any] interface {
|
||||||
LoadEvents(id uint64, grain Grain[V]) error
|
LoadEvents(id uint64, grain Grain[V]) error
|
||||||
AppendMutations(id uint64, msg ...proto.Message) error
|
AppendEvent(id uint64, msg ...proto.Message) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] {
|
func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] {
|
||||||
@@ -108,7 +108,7 @@ func (s *DiskStorage[V]) Close() {
|
|||||||
close(s.done)
|
close(s.done)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DiskStorage[V]) AppendMutations(id uint64, msg ...proto.Message) error {
|
func (s *DiskStorage[V]) AppendEvent(id uint64, msg ...proto.Message) error {
|
||||||
if s.queue != nil {
|
if s.queue != nil {
|
||||||
queue := make([]QueueEvent, 0)
|
queue := make([]QueueEvent, 0)
|
||||||
data, found := s.queue.Load(id)
|
data, found := s.queue.Load(id)
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import (
|
|||||||
// It delegates to a grain pool and cluster operations to a synced pool.
|
// It delegates to a grain pool and cluster operations to a synced pool.
|
||||||
type ControlServer[V any] struct {
|
type ControlServer[V any] struct {
|
||||||
messages.UnimplementedControlPlaneServer
|
messages.UnimplementedControlPlaneServer
|
||||||
|
|
||||||
pool GrainPool[V]
|
pool GrainPool[V]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,47 +0,0 @@
|
|||||||
package actor
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
|
|
||||||
"github.com/matst80/slask-finder/pkg/messaging"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
type LogListener interface {
|
|
||||||
AppendMutations(id uint64, msg ...ApplyResult)
|
|
||||||
}
|
|
||||||
|
|
||||||
type AmqpListener struct {
|
|
||||||
conn *amqp.Connection
|
|
||||||
transformer func(id uint64, msg []ApplyResult) (any, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewAmqpListener(conn *amqp.Connection, transformer func(id uint64, msg []ApplyResult) (any, error)) *AmqpListener {
|
|
||||||
return &AmqpListener{
|
|
||||||
conn: conn,
|
|
||||||
transformer: transformer,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *AmqpListener) DefineTopics() {
|
|
||||||
ch, err := l.conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to open a channel: %v", err)
|
|
||||||
}
|
|
||||||
defer ch.Close()
|
|
||||||
if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil {
|
|
||||||
log.Fatalf("Failed to declare topic mutation: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) {
|
|
||||||
data, err := l.transformer(id, msg)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to transform mutation event: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err = messaging.SendChange(l.conn, "cart", "mutation", data)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to send mutation event: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -17,7 +17,6 @@ type SimpleGrainPool[V any] struct {
|
|||||||
mutationRegistry MutationRegistry
|
mutationRegistry MutationRegistry
|
||||||
spawn func(id uint64) (Grain[V], error)
|
spawn func(id uint64) (Grain[V], error)
|
||||||
spawnHost func(host string) (Host, error)
|
spawnHost func(host string) (Host, error)
|
||||||
listeners []LogListener
|
|
||||||
storage LogStorage[V]
|
storage LogStorage[V]
|
||||||
ttl time.Duration
|
ttl time.Duration
|
||||||
poolSize int
|
poolSize int
|
||||||
@@ -67,19 +66,6 @@ func NewSimpleGrainPool[V any](config GrainPoolConfig[V]) (*SimpleGrainPool[V],
|
|||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SimpleGrainPool[V]) AddListener(listener LogListener) {
|
|
||||||
p.listeners = append(p.listeners, listener)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *SimpleGrainPool[V]) RemoveListener(listener LogListener) {
|
|
||||||
for i, l := range p.listeners {
|
|
||||||
if l == listener {
|
|
||||||
p.listeners = append(p.listeners[:i], p.listeners[i+1:]...)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *SimpleGrainPool[V]) purge() {
|
func (p *SimpleGrainPool[V]) purge() {
|
||||||
purgeLimit := time.Now().Add(-p.ttl)
|
purgeLimit := time.Now().Add(-p.ttl)
|
||||||
purgedIds := make([]uint64, 0, len(p.grains))
|
purgedIds := make([]uint64, 0, len(p.grains))
|
||||||
@@ -397,14 +383,11 @@ func (p *SimpleGrainPool[V]) Apply(id uint64, mutation ...proto.Message) (*Mutat
|
|||||||
}
|
}
|
||||||
if p.storage != nil {
|
if p.storage != nil {
|
||||||
go func() {
|
go func() {
|
||||||
if err := p.storage.AppendMutations(id, mutation...); err != nil {
|
if err := p.storage.AppendEvent(id, mutation...); err != nil {
|
||||||
log.Printf("failed to store mutation for grain %d: %v", id, err)
|
log.Printf("failed to store mutation for grain %d: %v", id, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
for _, listener := range p.listeners {
|
|
||||||
go listener.AppendMutations(id, mutations...)
|
|
||||||
}
|
|
||||||
result, err := grain.GetCurrentState()
|
result, err := grain.GetCurrentState()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -1,48 +0,0 @@
|
|||||||
package cart
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.tornberg.me/go-cart-actor/pkg/actor"
|
|
||||||
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewCartMultationRegistry() actor.MutationRegistry {
|
|
||||||
|
|
||||||
reg := actor.NewMutationRegistry()
|
|
||||||
reg.RegisterMutations(
|
|
||||||
actor.NewMutation(AddItem, func() *messages.AddItem {
|
|
||||||
return &messages.AddItem{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(ChangeQuantity, func() *messages.ChangeQuantity {
|
|
||||||
return &messages.ChangeQuantity{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(RemoveItem, func() *messages.RemoveItem {
|
|
||||||
return &messages.RemoveItem{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(InitializeCheckout, func() *messages.InitializeCheckout {
|
|
||||||
return &messages.InitializeCheckout{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(OrderCreated, func() *messages.OrderCreated {
|
|
||||||
return &messages.OrderCreated{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(RemoveDelivery, func() *messages.RemoveDelivery {
|
|
||||||
return &messages.RemoveDelivery{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(SetDelivery, func() *messages.SetDelivery {
|
|
||||||
return &messages.SetDelivery{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(SetPickupPoint, func() *messages.SetPickupPoint {
|
|
||||||
return &messages.SetPickupPoint{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(ClearCart, func() *messages.ClearCartRequest {
|
|
||||||
return &messages.ClearCartRequest{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(AddVoucher, func() *messages.AddVoucher {
|
|
||||||
return &messages.AddVoucher{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(RemoveVoucher, func() *messages.RemoveVoucher {
|
|
||||||
return &messages.RemoveVoucher{}
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
return reg
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -21,6 +21,7 @@ type Voucher struct {
|
|||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
// Add fields here
|
// Add fields here
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrInvalidCode = errors.New("invalid vouchercode")
|
var ErrInvalidCode = errors.New("invalid vouchercode")
|
||||||
|
|||||||
Reference in New Issue
Block a user