Skip to main content

Install EMQ-X MQTT Broker (v4.3)

Install Supporting Software

If installing as a single server with integrated database authentication, install the following:

Install MariaDB

Install Apache (Optional)

Install PHP 7.4 (Optional)

Install phpMyAdmin (Optional)

Install EMQ-X

wget https://www.emqx.com/en/downloads/broker/v4.3.16/emqx-ubuntu20.04-4.3.16-amd64.deb
sudo apt install ./emqx-ubuntu20.04-4.3.16-amd64.deb

Configure EMQ-X

Edit the EMQ-X configuration to accommodate the proxy:

sudo nano /etc/emqx/emqx.conf

Find the following section and edit as appropriate for the desired node name:

## Node name.
##
## See: http://erlang.org/doc/reference_manual/distributed.html
##
## Value: <name>@<host>
##
## Default: emqx@127.0.0.1
node.name = emqx@192.168.1.91

Configure EMQ-X for HAProxy

If installing behind a reverse proxy such as HAProxy you will need the following similar configuration inside your HAProxy server configuration:

Make sure to adjust accordingly for SSL and IP address

If clustering, configure the servers similar to this:

frontend mqtt_fe
        mode tcp
        option tcplog

        # Don't allow unencrypted connections
        #bind *:1883
        bind *:8883 ssl crt /etc/ssl/private

        # Reject connections that have an invalid MQTT packet
        tcp-request inspect-delay 10s
        tcp-request content reject unless { req.payload(0,0),mqtt_is_valid }
        default_backend mqtt_be

backend mqtt_be
        mode tcp
        balance leastconn

        # Create a stick table for session persistence
        stick-table type string len 32 size 100k expire 30m

        # Use ClientID / client_identifier as persistence key
        stick on req.payload(0,0),mqtt_field_value(connect,client_identifier)

        server mqtt1 192.168.1.91:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5
        server mqtt2 192.168.1.92:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5

If running multiple individual servers modify it to be similar to this:

frontend mqtt_fe
        mode tcp
        option tcplog

        # Don't allow unencrypted connections
        #bind *:1883
        bind *:8883 ssl crt /etc/ssl/private

        # Reject connections that have an invalid MQTT packet
        tcp-request inspect-delay 10s
        tcp-request content reject unless { req.payload(0,0),mqtt_is_valid }
        
        acl mqtt1 ssl_fc_sni -i mqtt1.icstexas.com
        acl mqtt2 ssl_fc_sni -i mqtt2.icstexas.com

        use_backend mqtt1_be if mqtt1
        use_backend mqtt2_be if mqtt2

        default_backend mqtt_be

backend mqtt1_be
        mode tcp

        server mqtt1 192.168.1.91:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5

backend mqtt2_be
        mode tcp

        server mqtt2 192.168.1.92:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5

Next, edit the EMQ-X configuration to accommodate the proxy:

sudo nano /etc/emqx/emqx.conf

Find the following section which will have the setting commented and uncomment the setting to look as follows:

## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed
## behind HAProxy or Nginx.
##
## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/
##
## Value: on | off
listener.tcp.external.proxy_protocol = on

For a cluster, find the following section and set the discovery mode to static, list all the nodes that will be part of the cluster, and make sure the node cookie is set to some secure password that matches on all cluster nodes:

## Cluster auto-discovery strategy.
##
## Value: Enum
## - manual: Manual join command
## - static: Static node list
## - mcast:  IP Multicast
## - dns:    DNS A Record
## - etcd:   etcd
## - k8s:    Kubernetes
##
## Default: manual
cluster.discovery = static

##--------------------------------------------------------------------
## Cluster using static node list

## Node list of the cluster.
##
## Value: String
cluster.static.seeds = emqx@192.168.144.91,emqx@192.168.144.92

## Cookie for distributed node communication.
##
## Value: String
node.cookie = SuperSecretClusterPassword

Configure EMQ-X for Database Authentication using MySQL/MariaDB

Edit the EMQ-X Plugin configuration for MySQL:

sudo nano /etc/emqx/plugins/emqx_auth_mysql.conf

Edit the following sections according to your login information:

## MySQL server address.
##
## Value: Port | IP:Port
##
## Examples: 3306, 127.0.0.1:3306, localhost:3306
auth.mysql.server = 127.0.0.1:3306

## MySQL username.
##
## Value: String
auth.mysql.username = mqtt

## MySQL password.
##
## Value: String
auth.mysql.password = password

## MySQL database.
##
## Value: String
auth.mysql.database = mqtt

Change the following sections to set up passwords to be stored using salted and hashed passwords:

Make sure to comment out the existing line which only uses hashing with sha256!

## Authentication query.
##
## Note that column names should be 'password' and 'salt' (if used).
## In case column names differ in your DB - please use aliases,
## e.g. "my_column_name as password".
##
## Value: SQL
##
## Variables:
##  - %u: username
##  - %c: clientid
##  - %C: common name of client TLS cert
##  - %d: subject of client TLS cert
##
auth.mysql.auth_query = select password, salt from mqtt_user where username = '%u' limit 1
## auth.mysql.auth_query = select password_hash as password from mqtt_user where username = '%u' limit 1

## Password hash.
##
## Value: plain | md5 | sha | sha256 | bcrypt
## auth.mysql.password_hash = sha256

## sha256 with salt prefix
## auth.mysql.password_hash = salt,sha256

## bcrypt with salt only prefix
## auth.mysql.password_hash = salt,bcrypt

## sha256 with salt suffix
auth.mysql.password_hash = sha256,salt

Next, edit the EMQ-X configuration to adjust for deny by default security:

sudo nano /etc/emqx/emqx.conf

Find the following section which will have the setting commented and uncomment the setting to look as follows:

## Allow anonymous authentication by default if no auth plugins loaded.
## Notice: Disable the option in production deployment!
##
## Value: true | false
allow_anonymous = false

## Allow or deny if no ACL rules matched.
##
## Value: allow | deny
acl_nomatch = deny

Even when using those settings, we still need to edit the fallback ACL file that's checked prior to using the acl_nomatch entry. Edit this ACL file:

sudo nano /etc/emqx/acl.conf

Change the last line to deny everything else (by default it's set to allow all) or comment it out with double % signs.

{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.

{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.

{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.

%% {allow, all}.
{deny, all}.

Finally force EMQX to reload this ACL file with the following command:

sudo emqx_ctl modules reload emqx_mod_acl_internal

Using phpMyAdmin or any other SQL tool, create the User and ACL table in the mqtt database with the following SQL commands:

USE mqtt;

CREATE TABLE `mqtt_user` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `username` varchar(100) DEFAULT NULL,
  `password` varchar(100) DEFAULT NULL,
  `salt` varchar(35) DEFAULT NULL,
  `is_superuser` tinyint(1) DEFAULT 0,
  `created` datetime DEFAULT current_timestamp(),
  `client_group` varchar(50) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `mqtt_acl` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `allow` int(1) DEFAULT 1 COMMENT '0: deny, 1: allow',
  `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IP Address',
  `username` varchar(100) DEFAULT NULL COMMENT 'Username',
  `clientid` varchar(100) DEFAULT NULL COMMENT 'Client ID',
  `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
  `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Generate logins using the following SQL command substituting username and password as necessary:

SET @salt=MD5(RAND());
INSERT INTO `mqtt_user` ( `username`, `password`, `salt`) VALUES ('MyUsername', SHA2(CONCAT('MyPassword123', @salt),0), @salt);

Grant ACL permissions using the following SQL command substituting the username, and topic as necessary:

ACL rule table field description:

  • allow: Deny (0), Allow (1)
  • ipaddr: Set IP address
  • username: User name for connecting to the client. If the value is set to $all, the rule applies to all users.
  • clientid: Client ID of the connected client
  • access: Allowed operations: subscribe (1), publish (2), both subscribe and publish (3)
  • topic: Topics to be controlled, which can use wildcards, and placeholders can be added to the topic to match client information. For example, the topic will be replaced with the client ID of the current client when matching t/%c
    • %u: Username
    • %c: Client ID
-- All users cannot subscribe to system topics
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (0, NULL, '$all', NULL, 1, '$SYS/#');

-- Allow clients on 10.59.1.100 to subscribe to system topics
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, '10.59.1.100', NULL, NULL, 1, '$SYS/#');

-- Deny client to subscribe to the topic of /smarthome/+/temperature
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (0, NULL, '$all', NULL, 1, '/smarthome/+/temperature');

-- Allow clients to subscribe to the topic of /smarthome/${clientid}/temperature with their own Client ID
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, NULL, '$all', NULL, 1, '/smarthome/%c/temperature');

For an easier way, the following stored procedures make adding/removing users and ACLs much easier:

DELIMITER $$
CREATE PROCEDURE `sp_addSuperUser`(IN `newUser` VARCHAR(50), IN `newPass` VARCHAR(50))
    NO SQL
BEGIN
	-- Check to make sure the user doesn't exist already
	SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username LIKE newUser);
    IF @count = 0 THEN
    	-- Generate random salt and create new user
		SET @salt = MD5(RAND());
    	INSERT INTO mqtt_user (`username`, `password`, `salt`, `is_superuser`) VALUES (newUser, SHA2(CONCAT(newPass,@salt), 0), @salt, 1);
	END IF;
END$$
DELIMITER ;

DELIMITER $$
CREATE PROCEDURE `sp_addEdgeUser`(IN `ClientGroup` VARCHAR(50), IN `newUser` VARCHAR(50), IN `newPass` VARCHAR(50))
    NO SQL
BEGIN
	-- Check to make sure the user doesn't exist already
	SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username LIKE newUser);
    IF @count = 0 THEN
    	-- Generate random salt and create new user
		SET @salt = MD5(RAND());
    	INSERT INTO mqtt_user (`username`, `password`, `salt`, `client_group`) VALUES (newUser, SHA2(CONCAT(newPass,@salt), 0), @salt, ClientGroup);
		
        -- Make sure ACL doesn't exist already for pub/sub to own node data
        SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic = CONCAT('spBv1.0/',ClientGroup,'/+/%u'));
    	IF @count = 0 THEN
        	-- Generate ACL
        	INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, NULL, '$all', NULL, 3, CONCAT('spBv1.0/',ClientGroup,'/+/%u'));
    	END IF;
		
        -- Make sure ACL doesn't exist already for pub/sub to own device data
        SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic = CONCAT('spBv1.0/',ClientGroup,'/+/%u/#'));
    	IF @count = 0 THEN
        	-- Generate ACL
        	INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, NULL, '$all', NULL, 3, CONCAT('spBv1.0/',ClientGroup,'/+/%u/#'));
    	END IF;

	END IF;
END$$
DELIMITER ;

DELIMITER $$
CREATE PROCEDURE `sp_addAppIDUser`(IN `newAppID` VARCHAR(50), IN `newPass` VARCHAR(50))
    MODIFIES SQL DATA
BEGIN
	-- Check to make sure the user doesn't exist already
	SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username LIKE newAppID);
    IF @count = 0 THEN
    	-- Generate random salt and create new user
		SET @salt = MD5(RAND());
    	INSERT INTO mqtt_user (`username`, `password`, `salt`) VALUES (newAppID, SHA2(CONCAT(newPass,@salt), 0), @salt);
		
        -- Make sure ACL doesn't exist already for all users to sub to this AppID state
        SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic = CONCAT('STATE/',newAppID));
    	IF @count = 0 THEN
        	-- Generate ACL
    		INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, NULL, '$all', NULL, 1, CONCAT('STATE/',newAppID));
    	END IF;

        -- Make sure ACL doesn't exist already for this user to sub to all Sparkplug B topics
        SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = newAppID AND mqtt_acl.topic = 'spBv1.0/#');
    	IF @count = 0 THEN
        	-- Generate ACL
    		INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, NULL, newAppID, NULL, 1, 'spBv1.0/#');
    	END IF;

        -- Make sure ACL doesn't exist already for pub/sub to own AppID state
        SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = newAppID AND mqtt_acl.topic = CONCAT('STATE/',newAppID));
    	IF @count = 0 THEN
        	-- Generate ACL
    		INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, NULL, newAppID, NULL, 3, CONCAT('STATE/',newAppID));
    	END IF;
    END IF;
END$$
DELIMITER ;

DELIMITER $$
CREATE PROCEDURE `sp_changePassword`(IN `username` VARCHAR(50), IN `newPass` VARCHAR(50))
    MODIFIES SQL DATA
BEGIN
	-- Check to make sure the user exists already
	SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE BINARY mqtt_user.username = username);
    IF @count = 1 THEN
    	-- Generate random salt and create new user
		SET @salt = MD5(RAND());
    	UPDATE mqtt_user SET `password` = SHA2(CONCAT(newPass,@salt), 0), `salt` = @salt WHERE BINARY mqtt_user.username = username;

    END IF;
END$$
DELIMITER ;

DELIMITER $$
CREATE PROCEDURE `sp_removeUser`(IN `username` VARCHAR(50))
    MODIFIES SQL DATA
BEGIN
	-- Check to make sure the user exists already
	SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE BINARY mqtt_user.username = username);
    IF @count = 1 THEN
    	-- Generate check to see if it has a client group
		SET @clientgroup = (SELECT client_group FROM mqtt_user WHERE BINARY mqtt_user.username = username);
		IF @clientgroup IS NULL THEN
			-- Delete ACL records first
			DELETE FROM mqtt_acl WHERE BINARY mqtt_acl.username = username;
			DELETE FROM mqtt_acl WHERE BINARY mqtt_acl.username = '$all' AND mqtt_acl.topic = CONCAT('STATE/',username);
		ELSE
			-- Check to make sure the client group isn't used anywhere else
			SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE BINARY mqtt_user.client_group = @clientgroup);
			-- If only 1 found, then this user is the only user using this group
			IF @count = 1 THEN
				-- Delete ACL records first
				DELETE FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic LIKE CONCAT('spBv1.0/',@clientgroup,'/%');
			END IF;
		END IF;

		-- Delete user record last
		DELETE FROM mqtt_user WHERE BINARY mqtt_user.username = username;
    END IF;
END$$
DELIMITER ;

Start EMQ-X

sudo service emqx start

Login to EMQ-X

Open your browser to the URL for the server: http://<IPAddr>:18083

The username is admin and the password is public

Be sure to change the password immediately!

Go to the plugins and enable the emqx_auth_mysql plugin.