79179198

Date: 2024-11-11 21:39:19
Score: 1
Natty:
Report link

After @hardillb answer, I realized that, indeed, the client was not authorized to subscribe to that topic, but another connection issue was happening. After a couple of attempts to make the code work, finally I came up with a solution that was capable of connecting to the IoT Core MQTT broker instance, publishing and subscribing to that topic, and receiving the message. This is the working code.

import (
    "bufio"
    "crypto/ecdsa"
    "crypto/rsa"
    "crypto/tls"
    "crypto/x509"
    "encoding/pem"
    "fmt"
    "log"
    "net"
    "net/http"
    "net/url"
    "time"

    "golang.org/x/net/proxy"

    "os"
    "os/signal"
    "syscall"

    MQTT "github.com/eclipse/paho.mqtt.golang"
)

type TlsCerts struct {
    IotPrivateKey     string
    IotCertificatePem string
    CaCertificatePem  string
    AlnpProtocols     []string
}

type Config struct {
    ClientId  string
    BrokerUrl string
    TlsCerts  TlsCerts
}

type httpProxy struct {
    host     string
    haveAuth bool
    username string
    password string
    forward  proxy.Dialer
}

func parseTlsConfig(tlsCerts TlsCerts) *tls.Config {
    if tlsCerts.IotPrivateKey == "" || tlsCerts.IotCertificatePem == "" {
        return nil
    }

    cert := parseTlsCertificates(tlsCerts)

    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM([]byte(AmazonRootCA1Cert))

    return &tls.Config{
        RootCAs:            caCertPool,
        Certificates:       []tls.Certificate{cert},
        InsecureSkipVerify: false,
        NextProtos:         tlsCerts.AlnpProtocols,
        ServerName:         "iot.customdomain.io",
    }
}

func parseTlsCertificates(
    tlsCerts TlsCerts,
) tls.Certificate {
    block, _ := pem.Decode([]byte(tlsCerts.IotPrivateKey))
    if block == nil {

        log.Panic("Failed to parse private key")

    }

    var key interface{}
    var err error

    key, err = x509.ParsePKCS1PrivateKey(block.Bytes)
    if err != nil {
        key, err = x509.ParsePKCS8PrivateKey(block.Bytes)
        if err != nil {
            log.Panicf("Failed to parse private key: %v", err)
        }

        switch k := key.(type) {
        case *rsa.PrivateKey:
            key = k
        case *ecdsa.PrivateKey:
            key = k
        default:
            log.Panicf("Unsupported private key type: %T", key)
        }
    }

    block, _ = pem.Decode([]byte(tlsCerts.IotCertificatePem))
    if block == nil {
        log.Panic("Failed to parse certificate")
    }
    cert, err := x509.ParseCertificate(block.Bytes)
    if err != nil {
        log.Panicf("Failed to parse certificate: %v", err)
    }

    return tls.Certificate{
        PrivateKey:  key,
        Certificate: [][]byte{cert.Raw},
    }
}

func (s httpProxy) String() string {
    return fmt.Sprintf("HTTP proxy dialer for %s", s.host)
}

func newHTTPProxy(uri *url.URL, forward proxy.Dialer) (proxy.Dialer, error) {
    s := new(httpProxy)
    s.host = uri.Host
    s.forward = forward
    if uri.User != nil {
        s.haveAuth = true
        s.username = uri.User.Username()
        s.password, _ = uri.User.Password()
    }

    return s, nil
}

func (s *httpProxy) Dial(_, addr string) (net.Conn, error) {
    reqURL := url.URL{
        Scheme: "https",
        Host:   addr,
    }

    req, err := http.NewRequest("CONNECT", reqURL.String(), nil)
    if err != nil {
        return nil, err
    }
    req.Close = false
    if s.haveAuth {
        req.SetBasicAuth(s.username, s.password)
    }
    req.Header.Set("User-Agent", "paho.mqtt")

    // Dial and create the client connection.
    c, err := s.forward.Dial("tcp", s.host)
    if err != nil {
        return nil, err
    }

    err = req.Write(c)
    if err != nil {
        _ = c.Close()
        return nil, err
    }

    resp, err := http.ReadResponse(bufio.NewReader(c), req)
    if err != nil {
        _ = c.Close()
        return nil, err
    }
    _ = resp.Body.Close()
    if resp.StatusCode != http.StatusOK {
        _ = c.Close()
        return nil, fmt.Errorf("proxied connection returned an error: %v", resp.Status)
    }
    TlsCerts := TlsCerts{
        IotPrivateKey:     IotPrivateKey,
        IotCertificatePem: IotCertificatePem,
        AlnpProtocols:     []string{"mqtt", "x-amzn-mqtt-ca"},
    }
    tlsConfig := parseTlsConfig(TlsCerts)

    tlsConn := tls.Client(c, tlsConfig)

    return tlsConn, nil
}

func init() {
    // Pre-register custom HTTP proxy dialers for use with proxy.FromEnvironment
    proxy.RegisterDialerType("http", newHTTPProxy)
    proxy.RegisterDialerType("https", newHTTPProxy)
}

func onMessageReceived(client MQTT.Client, message MQTT.Message) {
    fmt.Printf("Received message on topic: %s\n", message.Topic())
    fmt.Printf("Message: %s\n", message.Payload())
}

var messagePubHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
    fmt.Println("Received message on topic: " + msg.Topic())
    ProcessMessage(msg.Payload())
}

func ProcessMessage(payload []byte) {
    fmt.Println(string(payload))
}

func MainFunc() {
    MQTT.DEBUG = log.New(os.Stdout, "", 0)
    MQTT.ERROR = log.New(os.Stderr, "", 0)

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    server := "https://iot.customdomain.io:443"
    topic := "right/topic/now"
    qos := 0
    clientid := "my-client-id"

    os.Setenv("ALL_PROXY", fmt.Sprintf("http://localhost:%s", "3128"))
    defer os.Unsetenv("ALL_PROXY")

    connOpts := MQTT.NewClientOptions().AddBroker(server).
        SetClientID(clientid).
        SetCleanSession(true).
        SetProtocolVersion(4)

    connOpts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(topic, byte(qos), onMessageReceived); token.Wait() && token.Error() != nil {
            fmt.Println(token.Error())
        }

        text := `{"message": "Hello MQTT"}`
        token := c.Publish(topic, byte(qos), false, text)
        token.Wait()
    }

    dialer := proxy.FromEnvironment()
    connOpts.SetCustomOpenConnectionFn(func(uri *url.URL, options MQTT.ClientOptions) (net.Conn, error) {
        fmt.Printf("Custom dialer invoked for %s\n", uri.Host) // Debug log for verification
        address := uri.Host
        return dialer.Dial(uri.Scheme, address)
    })

    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    fmt.Printf("Connected to %s\n", server)

    time.Sleep(1 * time.Second)

    fmt.Println("Disconnecting")
    client.Disconnect(250)
    fmt.Println("Exiting")
}

A lot of stuff changed like the removal of the SetUsername and the SetPassword calls since I am already authenticating via certificates and private key.

This was removed as well:

connOpts.OnConnectAttempt = func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
        cfg := tlsCfg.Clone()
        cfg.ServerName = broker.Hostname()
        return cfg
    }

Apparently, the previous code was a complete mess, so there were a lot of issues, not only one issue, but one of those issues was exactly the problem if the lack of authorization for that specific topic. But the code above is capable of connecting to IoT Core through a Tinyproxy instance running on the localhost.

Reasons:
  • Long answer (-1):
  • Has code block (-0.5):
  • User mentioned (1): @hardillb
  • Self-answer (0.5):
  • Low reputation (1):
Posted by: tgdva