This commit is contained in:
@@ -83,7 +83,7 @@ func (p *SyncedPool) NegotiateHandler(data []byte) (uint32, []byte, error) {
|
|||||||
negotiationCount.Inc()
|
negotiationCount.Inc()
|
||||||
log.Printf("Handling negotiation\n")
|
log.Printf("Handling negotiation\n")
|
||||||
for _, host := range p.ExcludeKnown(strings.Split(string(data), ";")) {
|
for _, host := range p.ExcludeKnown(strings.Split(string(data), ";")) {
|
||||||
p.AddRemote(host)
|
go p.AddRemote(host)
|
||||||
}
|
}
|
||||||
|
|
||||||
return RemoteNegotiateResponse, []byte("ok"), nil
|
return RemoteNegotiateResponse, []byte("ok"), nil
|
||||||
@@ -111,14 +111,14 @@ func (p *SyncedPool) GrainOwnerChangeHandler(data []byte) (uint32, []byte, error
|
|||||||
grain, err := NewRemoteGrain(id, r.Host)
|
grain, err := NewRemoteGrain(id, r.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Owner change failed %s: %v\n", id, err)
|
log.Printf("Owner change failed %s: %v\n", id, err)
|
||||||
return AckChange, []byte("error"), nil
|
return AckChange, []byte("error"), err
|
||||||
}
|
}
|
||||||
p.remoteIndex[id] = grain
|
p.remoteIndex[id] = grain
|
||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
return AckChange, []byte("ok"), nil
|
return AckChange, []byte("ok"), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return AckChange, []byte("not found"), nil
|
return AckChange, []byte{}, fmt.Errorf("remote host not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
|
func NewSyncedPool(local *GrainLocalPool, hostname string, discovery Discovery) (*SyncedPool, error) {
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ type CartServer struct {
|
|||||||
func CartListen(address string) (*CartServer, error) {
|
func CartListen(address string) (*CartServer, error) {
|
||||||
listener, err := net.Listen("tcp", address)
|
listener, err := net.Listen("tcp", address)
|
||||||
server := &CartServer{
|
server := &CartServer{
|
||||||
NewCartTCPServerMux(100),
|
NewCartTCPServerMux(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -40,7 +40,7 @@ type TCPCartServerMux struct {
|
|||||||
functions map[uint32]func(CartId, []byte) (uint32, []byte, error)
|
functions map[uint32]func(CartId, []byte) (uint32, []byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCartTCPServerMux(maxClients int) *TCPCartServerMux {
|
func NewCartTCPServerMux() *TCPCartServerMux {
|
||||||
m := &TCPCartServerMux{
|
m := &TCPCartServerMux{
|
||||||
mu: sync.RWMutex{},
|
mu: sync.RWMutex{},
|
||||||
listeners: make(map[uint32]func(CartId, []byte) error),
|
listeners: make(map[uint32]func(CartId, []byte) error),
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ type Server struct {
|
|||||||
func Listen(address string) (*Server, error) {
|
func Listen(address string) (*Server, error) {
|
||||||
listener, err := net.Listen("tcp", address)
|
listener, err := net.Listen("tcp", address)
|
||||||
server := &Server{
|
server := &Server{
|
||||||
NewTCPServerMux(100),
|
NewTCPServerMux(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -40,7 +40,7 @@ type TCPServerMux struct {
|
|||||||
functions map[uint32]func(data []byte) (uint32, []byte, error)
|
functions map[uint32]func(data []byte) (uint32, []byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTCPServerMux(maxClients int) *TCPServerMux {
|
func NewTCPServerMux() *TCPServerMux {
|
||||||
m := &TCPServerMux{
|
m := &TCPServerMux{
|
||||||
mu: sync.RWMutex{},
|
mu: sync.RWMutex{},
|
||||||
listeners: make(map[uint32]func(data []byte) error),
|
listeners: make(map[uint32]func(data []byte) error),
|
||||||
|
|||||||
Reference in New Issue
Block a user