diff --git a/main.go b/main.go index 4a63451..df9180c 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,6 @@ import ( "syscall" "time" - messages "git.tornberg.me/go-cart-actor/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -85,57 +84,6 @@ func (a *App) HandleSave(w http.ResponseWriter, r *http.Request) { } } -type PoolServer struct { - pod_name string - pool GrainPool -} - -func NewPoolServer(pool GrainPool, pod_name string) *PoolServer { - return &PoolServer{ - pod_name: pod_name, - pool: pool, - } -} - -func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request) { - id := r.PathValue("id") - data, err := s.pool.Get(ToCartId(id)) - if err != nil { - w.WriteHeader(http.StatusNotFound) - w.Write([]byte(err.Error())) - return - } - w.Header().Set("Content-Type", "application/json") - w.Header().Set("X-Pod-Name", s.pod_name) - w.WriteHeader(http.StatusOK) - w.Write(data) -} - -func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request) { - id := r.PathValue("id") - sku := r.PathValue("sku") - data, err := s.pool.Process(ToCartId(id), Message{ - Type: AddRequestType, - Content: &messages.AddRequest{Sku: sku}, - }) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) - return - } - w.Header().Set("Content-Type", "application/json") - w.Header().Set("X-Pod-Name", s.pod_name) - w.WriteHeader(http.StatusOK) - w.Write(data) -} - -func (s *PoolServer) Serve() *http.ServeMux { - mux := http.NewServeMux() - mux.HandleFunc("GET /{id}", s.HandleGet) - mux.HandleFunc("GET /{id}/add/{sku}", s.HandleAddSku) - return mux -} - var podIp = os.Getenv("POD_IP") var name = os.Getenv("POD_NAME") diff --git a/pool-server.go b/pool-server.go new file mode 100644 index 0000000..3ac54d1 --- /dev/null +++ b/pool-server.go @@ -0,0 +1,58 @@ +package main + +import ( + "net/http" + + messages "git.tornberg.me/go-cart-actor/proto" +) + +type PoolServer struct { + pod_name string + pool GrainPool +} + +func NewPoolServer(pool GrainPool, pod_name string) *PoolServer { + return &PoolServer{ + pod_name: pod_name, + pool: pool, + } +} + +func (s *PoolServer) HandleGet(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + data, err := s.pool.Get(ToCartId(id)) + if err != nil { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte(err.Error())) + return + } + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Pod-Name", s.pod_name) + w.WriteHeader(http.StatusOK) + w.Write(data) +} + +func (s *PoolServer) HandleAddSku(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + sku := r.PathValue("sku") + data, err := s.pool.Process(ToCartId(id), Message{ + Type: AddRequestType, + Content: &messages.AddRequest{Sku: sku}, + }) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Pod-Name", s.pod_name) + w.WriteHeader(http.StatusOK) + w.Write(data) +} + +func (s *PoolServer) Serve() *http.ServeMux { + mux := http.NewServeMux() + mux.HandleFunc("GET /{id}", s.HandleGet) + mux.HandleFunc("GET /{id}/add/{sku}", s.HandleAddSku) + return mux +} diff --git a/product-fetcher.go b/product-fetcher.go index bab199b..e9c1421 100644 --- a/product-fetcher.go +++ b/product-fetcher.go @@ -17,7 +17,7 @@ type PriceTuple struct { } type OutletItem struct { - ArticleNumber string `json:"sku,opmitempty"` + ArticleNumber string `json:"sku,omitempty"` Price PriceTuple `json:"price,omitempty"` Title string `json:"title"` } diff --git a/rpc-pool.go b/rpc-pool.go index 2f0e49b..f01764b 100644 --- a/rpc-pool.go +++ b/rpc-pool.go @@ -24,19 +24,19 @@ func ToCartId(id string) CartId { type RemoteGrain struct { *CartClient - Id CartId - Address string + Id CartId + Host string } -func NewRemoteGrain(id CartId, address string) (*RemoteGrain, error) { - client, err := CartDial(fmt.Sprintf("%s:1337", address)) +func NewRemoteGrain(id CartId, host string) (*RemoteGrain, error) { + client, err := CartDial(fmt.Sprintf("%s:1337", host)) if err != nil { return nil, err } return &RemoteGrain{ Id: id, - Address: address, + Host: host, CartClient: client, }, nil } @@ -75,7 +75,7 @@ func (p *RemoteGrainPool) findRemoteGrain(id CartId) *RemoteGrain { p.mu.RLock() grain, ok := p.grains[id] p.mu.RUnlock() - if !ok || grain == nil { + if !ok { return nil } return grain diff --git a/rpc-server.go b/rpc-server.go index d828452..8b881c6 100644 --- a/rpc-server.go +++ b/rpc-server.go @@ -48,55 +48,9 @@ func (h *GrainHandler) RemoteHandleMessageHandler(id CartId, data []byte) (uint1 } func (h *GrainHandler) RemoteGetStateHandler(id CartId, data []byte) (uint16, []byte, error) { - data, err := h.pool.Get(id) + reply, err := h.pool.Get(id) if err != nil { return RemoteGetStateReply, nil, err } - return RemoteGetStateReply, data, nil + return RemoteGetStateReply, reply, nil } - -// func (h *GrainHandler) handleClient(conn net.Conn) { -// var err error - -// defer conn.Close() - -// var packet CartPacket - -// for { -// err = binary.Read(conn, binary.LittleEndian, &packet) -// if err != nil { -// if err == io.EOF { -// break -// } -// fmt.Println("Error in connection:", err) -// } -// if packet.Version != 2 { -// fmt.Printf("Unknown version %d", packet.Version) -// break -// } - -// switch packet.MessageType { -// case RemoteHandleMessage: -// var msg Message -// err = ReadMessage(conn, &msg) -// if err != nil { -// fmt.Println("Error reading message:", err) -// } - -// data, err := h.pool.Process(packet.Id, msg) -// if err != nil { -// fmt.Println("Error handling message:", err) -// } -// SendRawResponse(conn, data) - -// case RemoteGetState: -// data, err := h.pool.Get(packet.Id) -// if err != nil { -// fmt.Println("Error getting grain:", err) -// } -// SendRawResponse(conn, data) -// } - -// } - -// } diff --git a/synced-pool.go b/synced-pool.go index 29411f9..2e7f52a 100644 --- a/synced-pool.go +++ b/synced-pool.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "fmt" "log" "strings" @@ -25,12 +26,11 @@ type RemoteHost struct { type SyncedPool struct { *Server - mu sync.RWMutex - //Discovery Discovery + mu sync.RWMutex Hostname string local *GrainLocalPool remotes []*RemoteHost - remoteIndex map[CartId]*RemoteGrainPool + remoteIndex map[CartId]*RemoteGrain } var ( @@ -97,16 +97,23 @@ func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint16, []byte, error log.Printf("Invalid remote grain change message\n") return AckChange, []byte("incorrect"), nil } + id := ToCartId(idAndHostParts[0]) + host := idAndHostParts[1] for _, r := range p.remotes { - if r.Host == string(idAndHostParts[1]) { - log.Printf("Remote grain %s changed to %s\n", idAndHostParts[0], idAndHostParts[1]) + if r.Host == host { + log.Printf("Remote grain %s changed to %s\n", id, host) p.mu.Lock() - if p.local.grains[ToCartId(idAndHostParts[0])] != nil { - log.Printf("Grain %s already exists locally, deleting\n", idAndHostParts[0]) - delete(p.local.grains, ToCartId(idAndHostParts[0])) + if p.local.grains[id] != nil { + log.Printf("Grain %s already exists locally, deleting\n", id) + delete(p.local.grains, id) } - p.remoteIndex[ToCartId(idAndHostParts[0])] = r.Pool + grain, err := NewRemoteGrain(id, r.Host) + if err != nil { + log.Printf("Error creating remote grain %s: %v\n", id, err) + return AckChange, []byte("error"), nil + } + p.remoteIndex[id] = grain p.mu.Unlock() return AckChange, []byte("ok"), nil } @@ -131,7 +138,7 @@ func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) local: local, remotes: make([]*RemoteHost, 0), - remoteIndex: make(map[CartId]*RemoteGrainPool), + remoteIndex: make(map[CartId]*RemoteGrain), } server.HandleCall(Ping, pool.PongHandler) @@ -227,8 +234,7 @@ func (p *SyncedPool) RemoveHostMappedCarts(host *RemoteHost) { p.mu.Lock() defer p.mu.Unlock() for id, r := range p.remoteIndex { - if r == host.Pool { - p.remoteIndex[id].Delete(id) + if r.Host == host.Host { delete(p.remoteIndex, id) } } @@ -340,7 +346,12 @@ func (p *SyncedPool) addRemoteHost(address string, remote *RemoteHost) error { log.Printf("Grain %s already exists locally, deleting\n", id) delete(p.local.grains, id) } - p.remoteIndex[id] = remote.Pool + grain, err := NewRemoteGrain(id, remote.Host) + if err != nil { + log.Printf("Error creating remote grain %s: %v\n", id, err) + continue + } + p.remoteIndex[id] = grain } p.mu.Unlock() }() @@ -376,47 +387,59 @@ func (p *SyncedPool) AddRemote(address string) error { return p.addRemoteHost(address, &remote) } -func (p *SyncedPool) getGrainPool(id CartId) (GrainPool, error) { - _, ok := p.local.grains[id] +func (p *SyncedPool) getGrain(id CartId) (Grain, error) { + localGrain, ok := p.local.grains[id] if !ok { // check if remote grain exists p.mu.RLock() - remotePool, ok := p.remoteIndex[id] + remoteGrain, ok := p.remoteIndex[id] p.mu.RUnlock() if ok { - if remotePool == nil { - p.remoteIndex[id].Delete(id) + if remoteGrain == nil { + // p.remoteIndex[id].Delete(id) p.mu.Lock() delete(p.remoteIndex, id) p.mu.Unlock() - return nil, fmt.Errorf("remote pool is nil for %v", id) - } - remoteLookupCount.Inc() - return remotePool, nil - } - if !ok { - err := p.RequestOwnership(id) - if err != nil { - return nil, err + log.Printf("Remote grain %s is nil\n", id) + //return nil, fmt.Errorf("remote pool is nil for %v", id) + } else { + remoteLookupCount.Inc() + return remoteGrain, nil } } + err := p.RequestOwnership(id) + if err != nil { + return nil, err + } + } - return p.local, nil + return localGrain, nil } func (p *SyncedPool) Process(id CartId, messages ...Message) ([]byte, error) { - pool, err := p.getGrainPool(id) + pool, err := p.getGrain(id) + var res []byte if err != nil { return nil, err } - return pool.Process(id, messages...) + for _, m := range messages { + res, err = pool.HandleMessage(&m, false) + if err != nil { + return nil, err + } + } + return res, nil } func (p *SyncedPool) Get(id CartId) ([]byte, error) { - pool, err := p.getGrainPool(id) + grain, err := p.getGrain(id) if err != nil { return nil, err } - return pool.Get(id) + if remoteGrain, ok := grain.(*RemoteGrain); ok { + return remoteGrain.GetCurrentState() + } + + return json.Marshal(grain) } diff --git a/tcp-cart-mux-server.go b/tcp-cart-mux-server.go index 40ebe9d..b5a89ea 100644 --- a/tcp-cart-mux-server.go +++ b/tcp-cart-mux-server.go @@ -89,11 +89,11 @@ func (m *TCPCartServerMux) handleFunction(connection net.Conn, messageType uint1 } func (m *TCPCartServerMux) HandleConnection(connection net.Conn) error { - var packet *CartPacket + var packet CartPacket var err error defer connection.Close() for { - err = ReadPacket(connection, packet) + err = ReadPacket(connection, &packet) if err != nil { if err == io.EOF { return nil @@ -101,10 +101,7 @@ func (m *TCPCartServerMux) HandleConnection(connection net.Conn) error { log.Printf("Error receiving packet: %v\n", err) return err } - if packet == nil { - log.Println("Packet is nil") - continue - } + data, err := GetPacketData(connection, int(packet.DataLength)) if err != nil { log.Printf("Error getting packet data: %v\n", err)