feature/backoffice #6
@@ -11,6 +11,9 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.tornberg.me/go-cart-actor/pkg/cart"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileServer struct {
|
type FileServer struct {
|
||||||
@@ -122,6 +125,12 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid id"})
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid id"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// reconstruct state from event log if present
|
||||||
|
grain := cart.NewCartGrain(id, time.Now())
|
||||||
|
if globalDisk != nil {
|
||||||
|
_ = globalDisk.LoadEvents(id, grain)
|
||||||
|
}
|
||||||
|
|
||||||
path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id))
|
path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id))
|
||||||
info, err := os.Stat(path)
|
info, err := os.Stat(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -139,6 +148,7 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, map[string]any{
|
writeJSON(w, http.StatusOK, map[string]any{
|
||||||
"id": id,
|
"id": id,
|
||||||
|
"state": grain,
|
||||||
"rawLog": lines,
|
"rawLog": lines,
|
||||||
"meta": map[string]any{
|
"meta": map[string]any{
|
||||||
"size": info.Size(),
|
"size": info.Size(),
|
||||||
|
|||||||
248
cmd/backoffice/hub.go
Normal file
248
cmd/backoffice/hub.go
Normal file
@@ -0,0 +1,248 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"crypto/sha1"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/binary"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Hub manages websocket clients and broadcasts messages to them.
|
||||||
|
type Hub struct {
|
||||||
|
register chan *Client
|
||||||
|
unregister chan *Client
|
||||||
|
broadcast chan []byte
|
||||||
|
clients map[*Client]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Client represents a single websocket client connection.
|
||||||
|
type Client struct {
|
||||||
|
hub *Hub
|
||||||
|
conn net.Conn
|
||||||
|
send chan []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHub constructs a new Hub instance.
|
||||||
|
func NewHub() *Hub {
|
||||||
|
return &Hub{
|
||||||
|
register: make(chan *Client),
|
||||||
|
unregister: make(chan *Client),
|
||||||
|
broadcast: make(chan []byte, 1024),
|
||||||
|
clients: make(map[*Client]bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts the hub event loop.
|
||||||
|
func (h *Hub) Run() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case c := <-h.register:
|
||||||
|
h.clients[c] = true
|
||||||
|
case c := <-h.unregister:
|
||||||
|
if _, ok := h.clients[c]; ok {
|
||||||
|
delete(h.clients, c)
|
||||||
|
close(c.send)
|
||||||
|
_ = c.conn.Close()
|
||||||
|
}
|
||||||
|
case msg := <-h.broadcast:
|
||||||
|
for c := range h.clients {
|
||||||
|
select {
|
||||||
|
case c.send <- msg:
|
||||||
|
default:
|
||||||
|
// Client is slow or dead; drop it.
|
||||||
|
delete(h.clients, c)
|
||||||
|
close(c.send)
|
||||||
|
_ = c.conn.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// computeAccept computes the Sec-WebSocket-Accept header value.
|
||||||
|
func computeAccept(key string) string {
|
||||||
|
const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
|
||||||
|
h := sha1.New()
|
||||||
|
h.Write([]byte(key + magic))
|
||||||
|
return base64.StdEncoding.EncodeToString(h.Sum(nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServeWS upgrades the HTTP request to a WebSocket connection and registers a client.
|
||||||
|
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") || strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
|
||||||
|
http.Error(w, "upgrade required", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := r.Header.Get("Sec-WebSocket-Key")
|
||||||
|
if key == "" {
|
||||||
|
http.Error(w, "missing Sec-WebSocket-Key", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
accept := computeAccept(key)
|
||||||
|
|
||||||
|
hj, ok := w.(http.Hijacker)
|
||||||
|
if !ok {
|
||||||
|
http.Error(w, "websocket not supported", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
conn, buf, err := hj.Hijack()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the upgrade response
|
||||||
|
response := "HTTP/1.1 101 Switching Protocols\r\n" +
|
||||||
|
"Upgrade: websocket\r\n" +
|
||||||
|
"Connection: Upgrade\r\n" +
|
||||||
|
"Sec-WebSocket-Accept: " + accept + "\r\n" +
|
||||||
|
"\r\n"
|
||||||
|
if _, err := buf.WriteString(response); err != nil {
|
||||||
|
_ = conn.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := buf.Flush(); err != nil {
|
||||||
|
_ = conn.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &Client{
|
||||||
|
hub: h,
|
||||||
|
conn: conn,
|
||||||
|
send: make(chan []byte, 256),
|
||||||
|
}
|
||||||
|
h.register <- client
|
||||||
|
go client.writePump()
|
||||||
|
go client.readPump()
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeWSFrame writes a single WebSocket frame to the writer.
|
||||||
|
func writeWSFrame(w io.Writer, opcode byte, payload []byte) error {
|
||||||
|
// FIN set, opcode as provided
|
||||||
|
header := []byte{0x80 | (opcode & 0x0F)}
|
||||||
|
l := len(payload)
|
||||||
|
switch {
|
||||||
|
case l < 126:
|
||||||
|
header = append(header, byte(l))
|
||||||
|
case l <= 65535:
|
||||||
|
ext := make([]byte, 2)
|
||||||
|
binary.BigEndian.PutUint16(ext, uint16(l))
|
||||||
|
header = append(header, 126)
|
||||||
|
header = append(header, ext...)
|
||||||
|
default:
|
||||||
|
ext := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint64(ext, uint64(l))
|
||||||
|
header = append(header, 127)
|
||||||
|
header = append(header, ext...)
|
||||||
|
}
|
||||||
|
if _, err := w.Write(header); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if l > 0 {
|
||||||
|
if _, err := w.Write(payload); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// readPump handles control frames from the client and discards other incoming frames.
|
||||||
|
// This server is broadcast-only to clients.
|
||||||
|
func (c *Client) readPump() {
|
||||||
|
defer func() {
|
||||||
|
c.hub.unregister <- c
|
||||||
|
}()
|
||||||
|
reader := bufio.NewReader(c.conn)
|
||||||
|
for {
|
||||||
|
// Read first two bytes
|
||||||
|
b1, err := reader.ReadByte()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b2, err := reader.ReadByte()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
opcode := b1 & 0x0F
|
||||||
|
masked := (b2 & 0x80) != 0
|
||||||
|
length := int64(b2 & 0x7F)
|
||||||
|
if length == 126 {
|
||||||
|
ext := make([]byte, 2)
|
||||||
|
if _, err := io.ReadFull(reader, ext); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
length = int64(binary.BigEndian.Uint16(ext))
|
||||||
|
} else if length == 127 {
|
||||||
|
ext := make([]byte, 8)
|
||||||
|
if _, err := io.ReadFull(reader, ext); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
length = int64(binary.BigEndian.Uint64(ext))
|
||||||
|
}
|
||||||
|
var maskKey [4]byte
|
||||||
|
if masked {
|
||||||
|
if _, err := io.ReadFull(reader, maskKey[:]); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle Ping -> Pong
|
||||||
|
if opcode == 0x9 && length <= 125 {
|
||||||
|
payload := make([]byte, length)
|
||||||
|
if _, err := io.ReadFull(reader, payload); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Unmask if masked
|
||||||
|
if masked {
|
||||||
|
for i := int64(0); i < length; i++ {
|
||||||
|
payload[i] ^= maskKey[i%4]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = writeWSFrame(c.conn, 0xA, payload) // best-effort pong
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close frame
|
||||||
|
if opcode == 0x8 {
|
||||||
|
// Drain payload if any, then exit
|
||||||
|
if _, err := io.CopyN(io.Discard, reader, length); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// For other frames, just discard payload
|
||||||
|
if _, err := io.CopyN(io.Discard, reader, length); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// writePump sends queued messages to the client and pings periodically to keep the connection alive.
|
||||||
|
func (c *Client) writePump() {
|
||||||
|
ticker := time.NewTicker(30 * time.Second)
|
||||||
|
defer func() {
|
||||||
|
ticker.Stop()
|
||||||
|
_ = c.conn.Close()
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg, ok := <-c.send:
|
||||||
|
if !ok {
|
||||||
|
// try to send close frame
|
||||||
|
_ = writeWSFrame(c.conn, 0x8, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := writeWSFrame(c.conn, 0x1, msg); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-ticker.C:
|
||||||
|
// Send a ping to keep connections alive behind proxies
|
||||||
|
_ = writeWSFrame(c.conn, 0x9, []byte("ping"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,23 +1,17 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha1"
|
|
||||||
"encoding/base64"
|
|
||||||
"encoding/binary"
|
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
|
||||||
"log"
|
"log"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
actor "git.tornberg.me/go-cart-actor/pkg/actor"
|
||||||
|
"git.tornberg.me/go-cart-actor/pkg/cart"
|
||||||
|
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -37,230 +31,24 @@ func envOrDefault(key, def string) string {
|
|||||||
|
|
||||||
var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`)
|
var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`)
|
||||||
|
|
||||||
// WebSocket hub to broadcast live mutation events to connected clients.
|
var globalDisk *actor.DiskStorage[cart.CartGrain]
|
||||||
type Hub struct {
|
|
||||||
register chan *Client
|
|
||||||
unregister chan *Client
|
|
||||||
broadcast chan []byte
|
|
||||||
clients map[*Client]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type Client struct {
|
func buildRegistry() actor.MutationRegistry {
|
||||||
hub *Hub
|
reg := actor.NewMutationRegistry()
|
||||||
conn net.Conn
|
reg.RegisterMutations(
|
||||||
send chan []byte
|
actor.NewMutation(cart.AddItem, func() *messages.AddItem { return &messages.AddItem{} }),
|
||||||
}
|
actor.NewMutation(cart.ChangeQuantity, func() *messages.ChangeQuantity { return &messages.ChangeQuantity{} }),
|
||||||
|
actor.NewMutation(cart.RemoveItem, func() *messages.RemoveItem { return &messages.RemoveItem{} }),
|
||||||
func NewHub() *Hub {
|
actor.NewMutation(cart.InitializeCheckout, func() *messages.InitializeCheckout { return &messages.InitializeCheckout{} }),
|
||||||
return &Hub{
|
actor.NewMutation(cart.OrderCreated, func() *messages.OrderCreated { return &messages.OrderCreated{} }),
|
||||||
register: make(chan *Client),
|
actor.NewMutation(cart.RemoveDelivery, func() *messages.RemoveDelivery { return &messages.RemoveDelivery{} }),
|
||||||
unregister: make(chan *Client),
|
actor.NewMutation(cart.SetDelivery, func() *messages.SetDelivery { return &messages.SetDelivery{} }),
|
||||||
broadcast: make(chan []byte, 1024),
|
actor.NewMutation(cart.SetPickupPoint, func() *messages.SetPickupPoint { return &messages.SetPickupPoint{} }),
|
||||||
clients: make(map[*Client]bool),
|
actor.NewMutation(cart.ClearCart, func() *messages.ClearCartRequest { return &messages.ClearCartRequest{} }),
|
||||||
}
|
actor.NewMutation(cart.AddVoucher, func() *messages.AddVoucher { return &messages.AddVoucher{} }),
|
||||||
}
|
actor.NewMutation(cart.RemoveVoucher, func() *messages.RemoveVoucher { return &messages.RemoveVoucher{} }),
|
||||||
|
)
|
||||||
func (h *Hub) Run() {
|
return reg
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case c := <-h.register:
|
|
||||||
h.clients[c] = true
|
|
||||||
case c := <-h.unregister:
|
|
||||||
if _, ok := h.clients[c]; ok {
|
|
||||||
delete(h.clients, c)
|
|
||||||
close(c.send)
|
|
||||||
_ = c.conn.Close()
|
|
||||||
}
|
|
||||||
case msg := <-h.broadcast:
|
|
||||||
for c := range h.clients {
|
|
||||||
select {
|
|
||||||
case c.send <- msg:
|
|
||||||
default:
|
|
||||||
// Client is slow or dead; drop it.
|
|
||||||
delete(h.clients, c)
|
|
||||||
close(c.send)
|
|
||||||
_ = c.conn.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func computeAccept(key string) string {
|
|
||||||
const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
|
|
||||||
h := sha1.New()
|
|
||||||
h.Write([]byte(key + magic))
|
|
||||||
return base64.StdEncoding.EncodeToString(h.Sum(nil))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if !strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") || strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
|
|
||||||
http.Error(w, "upgrade required", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
key := r.Header.Get("Sec-WebSocket-Key")
|
|
||||||
if key == "" {
|
|
||||||
http.Error(w, "missing Sec-WebSocket-Key", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
accept := computeAccept(key)
|
|
||||||
|
|
||||||
hj, ok := w.(http.Hijacker)
|
|
||||||
if !ok {
|
|
||||||
http.Error(w, "websocket not supported", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
conn, buf, err := hj.Hijack()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write the upgrade response
|
|
||||||
response := "HTTP/1.1 101 Switching Protocols\r\n" +
|
|
||||||
"Upgrade: websocket\r\n" +
|
|
||||||
"Connection: Upgrade\r\n" +
|
|
||||||
"Sec-WebSocket-Accept: " + accept + "\r\n" +
|
|
||||||
"\r\n"
|
|
||||||
if _, err := buf.WriteString(response); err != nil {
|
|
||||||
_ = conn.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := buf.Flush(); err != nil {
|
|
||||||
_ = conn.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
client := &Client{
|
|
||||||
hub: h,
|
|
||||||
conn: conn,
|
|
||||||
send: make(chan []byte, 256),
|
|
||||||
}
|
|
||||||
h.register <- client
|
|
||||||
go client.writePump()
|
|
||||||
go client.readPump()
|
|
||||||
}
|
|
||||||
|
|
||||||
func writeWSFrame(w io.Writer, opcode byte, payload []byte) error {
|
|
||||||
// FIN set, opcode as provided
|
|
||||||
header := []byte{0x80 | (opcode & 0x0F)}
|
|
||||||
l := len(payload)
|
|
||||||
switch {
|
|
||||||
case l < 126:
|
|
||||||
header = append(header, byte(l))
|
|
||||||
case l <= 65535:
|
|
||||||
ext := make([]byte, 2)
|
|
||||||
binary.BigEndian.PutUint16(ext, uint16(l))
|
|
||||||
header = append(header, 126)
|
|
||||||
header = append(header, ext...)
|
|
||||||
default:
|
|
||||||
ext := make([]byte, 8)
|
|
||||||
binary.BigEndian.PutUint64(ext, uint64(l))
|
|
||||||
header = append(header, 127)
|
|
||||||
header = append(header, ext...)
|
|
||||||
}
|
|
||||||
if _, err := w.Write(header); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if l > 0 {
|
|
||||||
if _, err := w.Write(payload); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) readPump() {
|
|
||||||
defer func() {
|
|
||||||
c.hub.unregister <- c
|
|
||||||
}()
|
|
||||||
reader := bufio.NewReader(c.conn)
|
|
||||||
for {
|
|
||||||
// Read first two bytes
|
|
||||||
b1, err := reader.ReadByte()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
b2, err := reader.ReadByte()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
opcode := b1 & 0x0F
|
|
||||||
masked := (b2 & 0x80) != 0
|
|
||||||
length := int64(b2 & 0x7F)
|
|
||||||
if length == 126 {
|
|
||||||
ext := make([]byte, 2)
|
|
||||||
if _, err := io.ReadFull(reader, ext); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
length = int64(binary.BigEndian.Uint16(ext))
|
|
||||||
} else if length == 127 {
|
|
||||||
ext := make([]byte, 8)
|
|
||||||
if _, err := io.ReadFull(reader, ext); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
length = int64(binary.BigEndian.Uint64(ext))
|
|
||||||
}
|
|
||||||
var maskKey [4]byte
|
|
||||||
if masked {
|
|
||||||
if _, err := io.ReadFull(reader, maskKey[:]); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read payload
|
|
||||||
if opcode == 0x9 && length <= 125 { // Ping -> respond with Pong
|
|
||||||
payload := make([]byte, length)
|
|
||||||
if _, err := io.ReadFull(reader, payload); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Unmask if masked
|
|
||||||
if masked {
|
|
||||||
for i := int64(0); i < length; i++ {
|
|
||||||
payload[i] ^= maskKey[i%4]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ = writeWSFrame(c.conn, 0xA, payload) // best-effort pong
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close frame
|
|
||||||
if opcode == 0x8 {
|
|
||||||
// Drain payload if any, then exit
|
|
||||||
if _, err := io.CopyN(io.Discard, reader, length); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// For other frames, just discard payload
|
|
||||||
if _, err := io.CopyN(io.Discard, reader, length); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) writePump() {
|
|
||||||
ticker := time.NewTicker(30 * time.Second)
|
|
||||||
defer func() {
|
|
||||||
ticker.Stop()
|
|
||||||
_ = c.conn.Close()
|
|
||||||
}()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case msg, ok := <-c.send:
|
|
||||||
if !ok {
|
|
||||||
// try to send close frame
|
|
||||||
_ = writeWSFrame(c.conn, 0x8, nil)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := writeWSFrame(c.conn, 0x1, msg); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-ticker.C:
|
|
||||||
// Send a ping to keep connections alive behind proxies
|
|
||||||
_ = writeWSFrame(c.conn, 0x9, []byte("ping"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func startMutationConsumer(ctx context.Context, amqpURL string, hub *Hub) error {
|
func startMutationConsumer(ctx context.Context, amqpURL string, hub *Hub) error {
|
||||||
@@ -346,6 +134,9 @@ func main() {
|
|||||||
|
|
||||||
_ = os.MkdirAll(dataDir, 0755)
|
_ = os.MkdirAll(dataDir, 0755)
|
||||||
|
|
||||||
|
reg := buildRegistry()
|
||||||
|
globalDisk = actor.NewDiskStorage[cart.CartGrain](dataDir, reg)
|
||||||
|
|
||||||
fs := NewFileServer(dataDir)
|
fs := NewFileServer(dataDir)
|
||||||
|
|
||||||
hub := NewHub()
|
hub := NewHub()
|
||||||
@@ -383,20 +174,10 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
log.Printf("backoffice HTTP listening on %s (dataDir=%s)", addr, dataDir)
|
||||||
log.Printf("backoffice HTTP listening on %s (dataDir=%s)", addr, dataDir)
|
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||||
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
log.Fatalf("http server error: %v", err)
|
||||||
log.Fatalf("http server error: %v", err)
|
}
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
sigs := make(chan os.Signal, 1)
|
// server stopped
|
||||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
<-sigs
|
|
||||||
log.Printf("shutting down...")
|
|
||||||
|
|
||||||
shutdownCtx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel2()
|
|
||||||
_ = srv.Shutdown(shutdownCtx)
|
|
||||||
cancel()
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ metadata:
|
|||||||
arch: amd64
|
arch: amd64
|
||||||
name: cart-backoffice-x86
|
name: cart-backoffice-x86
|
||||||
spec:
|
spec:
|
||||||
replicas: 3
|
replicas: 1
|
||||||
selector:
|
selector:
|
||||||
matchLabels:
|
matchLabels:
|
||||||
app: cart-backoffice
|
app: cart-backoffice
|
||||||
|
|||||||
Reference in New Issue
Block a user