package mqtt import ( "app/pkg/datastore" "app/pkg/telldus" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" ) // Client handles all MQTT operations type Client struct { client mqtt.Client store *datastore.DataStore subscriptions []string } // Config holds MQTT connection configuration type Config struct { BrokerURL string Username string Password string } // New creates a new MQTT client func New(cfg Config, store *datastore.DataStore) (*Client, error) { opts := mqtt.NewClientOptions().AddBroker(cfg.BrokerURL) opts.SetUsername(cfg.Username) opts.SetPassword(cfg.Password) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { return nil, token.Error() } return &Client{ client: client, store: store, }, nil } // Close disconnects the MQTT client func (c *Client) Close() { c.client.Disconnect(250) } // PublishDeviceState publishes a device state change func (c *Client) PublishDeviceState(deviceID int, state string) { topic := fmt.Sprintf("telldus/device/%d/state", deviceID) c.client.Publish(topic, 0, false, state) } // PublishSensorValue publishes a sensor value func (c *Client) PublishSensorValue(protocol, model string, id int, dataType int, value string) { var topic string switch dataType { case telldus.DataTypeTemperature: topic = fmt.Sprintf("telldus/sensor/%s/%s/%d/temperature", protocol, model, id) case telldus.DataTypeHumidity: topic = fmt.Sprintf("telldus/sensor/%s/%s/%d/humidity", protocol, model, id) default: return } c.client.Publish(topic, 0, false, value) } // UnsubscribeFromDeviceCommands unsubscribes from all tracked command topics func (c *Client) UnsubscribeFromDeviceCommands() { if len(c.subscriptions) > 0 { if token := c.client.Unsubscribe(c.subscriptions...); token.Wait() && token.Error() != nil { // Log error but don't fail } c.subscriptions = nil } } // SubscribeToDeviceCommands subscribes to command topics for all devices func (c *Client) SubscribeToDeviceCommands() error { // Unsubscribe from existing subscriptions first c.UnsubscribeFromDeviceCommands() numDevices := telldus.GetNumberOfDevices() for i := 0; i < numDevices; i++ { deviceID := telldus.GetDeviceId(i) topic := fmt.Sprintf("telldus/device/%d/set", deviceID) // Capture deviceID in closure id := deviceID c.client.Subscribe(topic, 0, func(client mqtt.Client, msg mqtt.Message) { payload := string(msg.Payload()) if payload == "ON" { telldus.TurnOn(id) } else if payload == "OFF" { telldus.TurnOff(id) } }) // Track subscription c.subscriptions = append(c.subscriptions, topic) } return nil }