After @hardillb answer, I realized that, indeed, the client was not authorized to subscribe to that topic, but another connection issue was happening. After a couple of attempts to make the code work, finally I came up with a solution that was capable of connecting to the IoT Core MQTT broker instance, publishing and subscribing to that topic, and receiving the message. This is the working code.
import (
"bufio"
"crypto/ecdsa"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"log"
"net"
"net/http"
"net/url"
"time"
"golang.org/x/net/proxy"
"os"
"os/signal"
"syscall"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type TlsCerts struct {
IotPrivateKey string
IotCertificatePem string
CaCertificatePem string
AlnpProtocols []string
}
type Config struct {
ClientId string
BrokerUrl string
TlsCerts TlsCerts
}
type httpProxy struct {
host string
haveAuth bool
username string
password string
forward proxy.Dialer
}
func parseTlsConfig(tlsCerts TlsCerts) *tls.Config {
if tlsCerts.IotPrivateKey == "" || tlsCerts.IotCertificatePem == "" {
return nil
}
cert := parseTlsCertificates(tlsCerts)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(AmazonRootCA1Cert))
return &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: false,
NextProtos: tlsCerts.AlnpProtocols,
ServerName: "iot.customdomain.io",
}
}
func parseTlsCertificates(
tlsCerts TlsCerts,
) tls.Certificate {
block, _ := pem.Decode([]byte(tlsCerts.IotPrivateKey))
if block == nil {
log.Panic("Failed to parse private key")
}
var key interface{}
var err error
key, err = x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
key, err = x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
log.Panicf("Failed to parse private key: %v", err)
}
switch k := key.(type) {
case *rsa.PrivateKey:
key = k
case *ecdsa.PrivateKey:
key = k
default:
log.Panicf("Unsupported private key type: %T", key)
}
}
block, _ = pem.Decode([]byte(tlsCerts.IotCertificatePem))
if block == nil {
log.Panic("Failed to parse certificate")
}
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
log.Panicf("Failed to parse certificate: %v", err)
}
return tls.Certificate{
PrivateKey: key,
Certificate: [][]byte{cert.Raw},
}
}
func (s httpProxy) String() string {
return fmt.Sprintf("HTTP proxy dialer for %s", s.host)
}
func newHTTPProxy(uri *url.URL, forward proxy.Dialer) (proxy.Dialer, error) {
s := new(httpProxy)
s.host = uri.Host
s.forward = forward
if uri.User != nil {
s.haveAuth = true
s.username = uri.User.Username()
s.password, _ = uri.User.Password()
}
return s, nil
}
func (s *httpProxy) Dial(_, addr string) (net.Conn, error) {
reqURL := url.URL{
Scheme: "https",
Host: addr,
}
req, err := http.NewRequest("CONNECT", reqURL.String(), nil)
if err != nil {
return nil, err
}
req.Close = false
if s.haveAuth {
req.SetBasicAuth(s.username, s.password)
}
req.Header.Set("User-Agent", "paho.mqtt")
// Dial and create the client connection.
c, err := s.forward.Dial("tcp", s.host)
if err != nil {
return nil, err
}
err = req.Write(c)
if err != nil {
_ = c.Close()
return nil, err
}
resp, err := http.ReadResponse(bufio.NewReader(c), req)
if err != nil {
_ = c.Close()
return nil, err
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
_ = c.Close()
return nil, fmt.Errorf("proxied connection returned an error: %v", resp.Status)
}
TlsCerts := TlsCerts{
IotPrivateKey: IotPrivateKey,
IotCertificatePem: IotCertificatePem,
AlnpProtocols: []string{"mqtt", "x-amzn-mqtt-ca"},
}
tlsConfig := parseTlsConfig(TlsCerts)
tlsConn := tls.Client(c, tlsConfig)
return tlsConn, nil
}
func init() {
// Pre-register custom HTTP proxy dialers for use with proxy.FromEnvironment
proxy.RegisterDialerType("http", newHTTPProxy)
proxy.RegisterDialerType("https", newHTTPProxy)
}
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
fmt.Printf("Received message on topic: %s\n", message.Topic())
fmt.Printf("Message: %s\n", message.Payload())
}
var messagePubHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Println("Received message on topic: " + msg.Topic())
ProcessMessage(msg.Payload())
}
func ProcessMessage(payload []byte) {
fmt.Println(string(payload))
}
func MainFunc() {
MQTT.DEBUG = log.New(os.Stdout, "", 0)
MQTT.ERROR = log.New(os.Stderr, "", 0)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
server := "https://iot.customdomain.io:443"
topic := "right/topic/now"
qos := 0
clientid := "my-client-id"
os.Setenv("ALL_PROXY", fmt.Sprintf("http://localhost:%s", "3128"))
defer os.Unsetenv("ALL_PROXY")
connOpts := MQTT.NewClientOptions().AddBroker(server).
SetClientID(clientid).
SetCleanSession(true).
SetProtocolVersion(4)
connOpts.OnConnect = func(c MQTT.Client) {
if token := c.Subscribe(topic, byte(qos), onMessageReceived); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
text := `{"message": "Hello MQTT"}`
token := c.Publish(topic, byte(qos), false, text)
token.Wait()
}
dialer := proxy.FromEnvironment()
connOpts.SetCustomOpenConnectionFn(func(uri *url.URL, options MQTT.ClientOptions) (net.Conn, error) {
fmt.Printf("Custom dialer invoked for %s\n", uri.Host) // Debug log for verification
address := uri.Host
return dialer.Dial(uri.Scheme, address)
})
client := MQTT.NewClient(connOpts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
fmt.Printf("Connected to %s\n", server)
time.Sleep(1 * time.Second)
fmt.Println("Disconnecting")
client.Disconnect(250)
fmt.Println("Exiting")
}
A lot of stuff changed like the removal of the SetUsername
and the SetPassword
calls since I am already authenticating via certificates and private key.
This was removed as well:
connOpts.OnConnectAttempt = func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
cfg := tlsCfg.Clone()
cfg.ServerName = broker.Hostname()
return cfg
}
Apparently, the previous code was a complete mess, so there were a lot of issues, not only one issue, but one of those issues was exactly the problem if the lack of authorization for that specific topic. But the code above is capable of connecting to IoT Core through a Tinyproxy instance running on the localhost.