should work better
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m48s
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m48s
This commit is contained in:
@@ -163,6 +163,7 @@ const (
|
|||||||
|
|
||||||
type PacketWithData struct {
|
type PacketWithData struct {
|
||||||
MessageType uint16
|
MessageType uint16
|
||||||
|
Added time.Time
|
||||||
Data []byte
|
Data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -182,34 +183,52 @@ func NewPacketQueue(connection net.Conn) *PacketQueue {
|
|||||||
messageType, data, err := ReceivePacket(queue.connection)
|
messageType, data, err := ReceivePacket(queue.connection)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error receiving packet: %v\n", err)
|
log.Printf("Error receiving packet: %v\n", err)
|
||||||
return
|
//return
|
||||||
}
|
}
|
||||||
|
log.Printf("Received packet %d\n", messageType)
|
||||||
queue.mu.Lock()
|
queue.mu.Lock()
|
||||||
queue.Packets = append(queue.Packets, PacketWithData{
|
queue.Packets = append(queue.Packets, PacketWithData{
|
||||||
MessageType: messageType,
|
MessageType: messageType,
|
||||||
|
Added: time.Now(),
|
||||||
Data: data,
|
Data: data,
|
||||||
})
|
})
|
||||||
queue.mu.Unlock()
|
queue.mu.Unlock()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
go func(queueTimer *time.Ticker) {
|
||||||
|
for {
|
||||||
|
<-queueTimer.C
|
||||||
|
queue.mu.Lock()
|
||||||
|
for i, packet := range queue.Packets {
|
||||||
|
if time.Since(packet.Added) > time.Second*5 {
|
||||||
|
queue.Packets = append(queue.Packets[:i], queue.Packets[i+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
queue.mu.Unlock()
|
||||||
|
}
|
||||||
|
}(time.NewTicker(time.Second))
|
||||||
return queue
|
return queue
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PacketQueue) Expect(messageType uint16, timeToWait time.Duration) (PacketWithData, error) {
|
func (p *PacketQueue) Expect(messageType uint16, timeToWait time.Duration) (*PacketWithData, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
for {
|
for {
|
||||||
if time.Since(start) > timeToWait {
|
if time.Since(start) > timeToWait {
|
||||||
return PacketWithData{}, fmt.Errorf("timeout waiting for message type %d", messageType)
|
return nil, fmt.Errorf("timeout waiting for message type %d", messageType)
|
||||||
}
|
}
|
||||||
for i, packet := range p.Packets {
|
for i, packet := range p.Packets {
|
||||||
if packet.MessageType == messageType {
|
if packet.MessageType == messageType && packet.Added.After(start) {
|
||||||
|
toReturn := PacketWithData{
|
||||||
|
MessageType: packet.MessageType,
|
||||||
|
Data: packet.Data,
|
||||||
|
}
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
p.Packets = append(p.Packets[:i], p.Packets[i+1:]...)
|
p.Packets = append(p.Packets[:i], p.Packets[i+1:]...)
|
||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
return packet, nil
|
return &toReturn, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
time.Sleep(time.Millisecond * 50)
|
time.Sleep(time.Millisecond * 5)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -243,13 +262,6 @@ func (p *SyncedPool) handleConnection(conn net.Conn) {
|
|||||||
knownHosts := strings.Split(string(data), ";")
|
knownHosts := strings.Split(string(data), ";")
|
||||||
log.Printf("Negotiated with remote, found %v hosts\n", knownHosts)
|
log.Printf("Negotiated with remote, found %v hosts\n", knownHosts)
|
||||||
|
|
||||||
for _, h := range knownHosts {
|
|
||||||
err = p.AddRemoteWithConnection(h, conn)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Error adding remote %s: %v\n", h, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SendPacket(conn, RemoteNegotiate, func(w io.Writer) error {
|
SendPacket(conn, RemoteNegotiate, func(w io.Writer) error {
|
||||||
hostnames := make([]string, 0, len(p.remotes))
|
hostnames := make([]string, 0, len(p.remotes))
|
||||||
for _, r := range p.remotes {
|
for _, r := range p.remotes {
|
||||||
@@ -258,6 +270,12 @@ func (p *SyncedPool) handleConnection(conn net.Conn) {
|
|||||||
w.Write([]byte(strings.Join(hostnames, ";")))
|
w.Write([]byte(strings.Join(hostnames, ";")))
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
for _, h := range knownHosts {
|
||||||
|
err = p.AddRemote(h)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error adding remote %s: %v\n", h, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
case RemoteGrainChanged:
|
case RemoteGrainChanged:
|
||||||
// remote grain changed
|
// remote grain changed
|
||||||
grainSyncCount.Inc()
|
grainSyncCount.Inc()
|
||||||
@@ -298,10 +316,13 @@ func (p *SyncedPool) handleConnection(conn net.Conn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) {
|
func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) {
|
||||||
SendPacket(h.connection, RemoteNegotiate, func(w io.Writer) error {
|
err := SendPacket(h.connection, RemoteNegotiate, func(w io.Writer) error {
|
||||||
w.Write([]byte(strings.Join(knownHosts, ";")))
|
w.Write([]byte(strings.Join(knownHosts, ";")))
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
packet, err := h.queue.Expect(RemoteNegotiate, time.Second)
|
packet, err := h.queue.Expect(RemoteNegotiate, time.Second)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
|
|
||||||
func TestConnection(t *testing.T) {
|
func TestConnection(t *testing.T) {
|
||||||
// TestConnection tests the connection to the server
|
// TestConnection tests the connection to the server
|
||||||
t.Log("Testing connection to server")
|
|
||||||
localPool := NewGrainLocalPool(100, time.Minute, func(id CartId) (*CartGrain, error) {
|
localPool := NewGrainLocalPool(100, time.Minute, func(id CartId) (*CartGrain, error) {
|
||||||
return &CartGrain{
|
return &CartGrain{
|
||||||
Id: id,
|
Id: id,
|
||||||
|
|||||||
Reference in New Issue
Block a user