Install EMQ-X MQTT Broker (v5.x)
Install Supporting Software
If installing as a single server with integrated database authentication, install the following:
Install EMQ-X
Ubuntu 20.04
wget https://www.emqx.com/en/downloads/broker/v5.1.0/emqx-5.1.0-ubuntu20.04-amd64.deb
sudo dpkg -i ./emqx-5.1.0-ubuntu20.04-amd64.debUbuntu 22.04
wget https://www.emqx.com/en/downloads/broker/v5.1.0/emqx-5.1.0-ubuntu22.04-amd64.deb
sudo dpkg -i ./emqx-5.1.0-ubuntu22.04-amd64.debConfigure EMQ-X
Edit the EMQ-X configuration to accommodate the proxy:
sudo nano /etc/emqx/emqx.confFind the following section and edit as appropriate for the desired node name and cookie for if using clustering:
## Default Node Name: emqx@127.0.0.1
node {
  name = "emqx@192.168.1.91"
  cookie = emqxsecretcookie
  data_dir = "/var/lib/emqx"
  etc_dir = "/etc/emqx"
}Start EMQ-X
sudo service emqx startLogin to EMQ-X
Open your browser to the URL for the server: http://<IPAddr>:18083
The username is admin and the password is public
Be sure to change the password immediately!
Configure EMQ-X for HAProxy
If installing behind a reverse proxy such as HAProxy you will need the following similar configuration inside your HAProxy server configuration:
Make sure to adjust accordingly for SSL and IP address
If clustering, configure the servers similar to this:
frontend mqtt_fe
        mode tcp
        option tcplog
        # Don't allow unencrypted connections
        #bind *:1883
        bind *:8883 ssl crt /etc/ssl/private
        # Reject connections that have an invalid MQTT packet
        tcp-request inspect-delay 10s
        tcp-request content reject unless { req.payload(0,0),mqtt_is_valid }
        default_backend mqtt_be
backend mqtt_be
        mode tcp
        balance leastconn
        # Create a stick table for session persistence
        stick-table type string len 32 size 100k expire 30m
        # Use ClientID / client_identifier as persistence key
        stick on req.payload(0,0),mqtt_field_value(connect,client_identifier)
        server mqtt1 192.168.1.91:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5
        server mqtt2 192.168.1.92:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5If running multiple individual servers modify it to be similar to this:
frontend mqtt_fe
        mode tcp
        option tcplog
        # Don't allow unencrypted connections
        #bind *:1883
        bind *:8883 ssl crt /etc/ssl/private
        # Reject connections that have an invalid MQTT packet
        tcp-request inspect-delay 10s
        tcp-request content reject unless { req.payload(0,0),mqtt_is_valid }
        
        acl mqtt1 ssl_fc_sni -i mqtt1.<domain>.com
        acl mqtt2 ssl_fc_sni -i mqtt2.<domain>.com
        use_backend mqtt1_be if mqtt1
        use_backend mqtt2_be if mqtt2
        default_backend mqtt_be
backend mqtt1_be
        mode tcp
        server mqtt1 192.168.1.91:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5
backend mqtt2_be
        mode tcp
        server mqtt2 192.168.1.92:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5Next, edit the EMQ-X listener settings in the web interface for listener tcp bound to 0.0.0.0:1883 to accommodate the proxy:
- Change the Proxy Protocol setting to true
For a cluster, edit the EMQ-X configuration to change the discovery mode:
sudo nano /etc/emqx/emqx.confFind the following section and set the discovery mode to static, list all the nodes that will be part of the cluster, and make sure the node cookie is set to some secure password that matches on all cluster nodes:
## Cluster auto-discovery strategy.
##
## Value: Enum
## - manual: Manual join command
## - static: Static node list
## - mcast:  IP Multicast
## - dns:    DNS A Record
## - etcd:   etcd
## - k8s:    Kubernetes
##
## Default: manual
cluster {
  name = emqxcl
  discovery_strategy = static
  static {
  	seeds = ["emqx@192.168.1.91", "emqx@192.168.1.92"]
  }
}Configure EMQ-X for Database Authentication using MySQL/MariaDB
Using any SQL tool, create the User and ACL table in the mqtt_user database with the following SQL commands:
CREATE DATABASE `mqtt_user`;
USE mqtt_user;
CREATE TABLE IF NOT EXISTS `mqtt_user` (
  `id` int unsigned NOT NULL AUTO_INCREMENT,
  `username` varchar(100) DEFAULT NULL,
  `password_hash` varchar(100) DEFAULT NULL,
  `salt` varchar(35) DEFAULT NULL,
  `is_superuser` tinyint DEFAULT 0,
  `created` datetime DEFAULT current_timestamp(),
  `client_group` varchar(50) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=UTF8MB4 COLLATE utf8mb4_general_ci;
CREATE TABLE IF NOT EXISTS `mqtt_acl` (
  `id` int unsigned NOT NULL AUTO_INCREMENT,
  `ipaddress` VARCHAR(60) NOT NULL DEFAULT '',
  `username` VARCHAR(255) NOT NULL DEFAULT '',
  `clientid` VARCHAR(255) NOT NULL DEFAULT '',
  `action` ENUM('publish', 'subscribe', 'all') NOT NULL,
  `permission` ENUM('allow', 'deny') NOT NULL,
  `topic` VARCHAR(255) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE utf8mb4_general_ci;Generate logins using the following SQL command substituting username and password as necessary:
SET @salt=MD5(RAND());
INSERT INTO `mqtt_user` ( `username`, `password_hash`, `salt`) VALUES ('MyUsername', SHA2(CONCAT('MyPassword123', @salt),0), @salt);Grant ACL permissions using the following SQL command substituting the username, and topic as necessary:
ACL rule table field description:
- ipaddress: Set IP address
- username: User name for connecting to the client. If the value is set to $all, the rule applies to all users.
- clientid: Client ID of the connected client
- action: Allowed operations: publish,subscribe,all
- permission: allow,deny
- topic: Topics to be controlled, which can use wildcards, and placeholders can be added to the topic to match client information. For example, the topic will be replaced with the client ID of the current client when matching t/%c- ${username}: Username
- ${clientid}: Client ID
- ${peerhost}: Client's IP Address
 
-- All users cannot subscribe to system topics
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'subscribe', 'deny', '$SYS/#');
-- Allow clients on 10.59.1.100 to subscribe to system topics
INSERT INTO mqtt_acl (ipaddress, action, permission, topic) VALUES ('10.59.1.100', 'subscribe', 'allow', '$SYS/#');
-- Deny client to subscribe to the topic of /smarthome/+/temperature
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'subscribe', 'deny', '/smarthome/+/temperature');
-- Allow clients to subscribe to the topic of /smarthome/${clientid}/temperature with their own Client ID
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'subscribe', 'allow', '/smarthome/${clientid}/temperature');For an easier way, the following stored procedures make adding/removing users for Ignition's CirrusLink MQTT module and ACLs much easier:
DELIMITER $$
CREATE PROCEDURE `sp_addSuperUser`(IN `newUser` VARCHAR(50), IN `newPass` VARCHAR(50))
	NO SQL
BEGIN
	-- Check to make sure the user doesn't exist already
	SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username LIKE newUser);
	IF @count = 0 THEN
		-- Generate random salt and create new user
		SET @salt = MD5(RAND());
		INSERT INTO mqtt_user (`username`, `password_hash`, `salt`, `is_superuser`) VALUES (newUser, SHA2(CONCAT(newPass,@salt), 0), @salt, 1);
		-- Make sure ACL doesn't exist already for pub/sub to own node data
		SET @topic = '#';
		SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = newUser AND mqtt_acl.topic = @topic);
		IF @count = 0 THEN
			-- Generate ACL
			INSERT INTO mqtt_acl (username, action, permission, topic) VALUES (newUser, 'all', 'allow', @topic);
		END IF;
	END IF;
END$$
DELIMITER ;
DELIMITER $$
CREATE PROCEDURE `sp_addEdgeNode`(IN `ClientName` VARCHAR(50), IN `Site` VARCHAR(50), IN `EdgeNode` VARCHAR(50), IN `newPass` VARCHAR(50))
	NO SQL
BEGIN
	-- Check to make sure the user doesn't exist already
	SET @siteunit = CONCAT(Site,'-',EdgeNode);
	SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username LIKE @siteunit);
	IF @count = 0 THEN
		-- Generate random salt and create new user
		SET @salt = MD5(RAND());
		INSERT INTO mqtt_user (`username`, `password_hash`, `salt`, `client_group`) VALUES (@siteunit, SHA2(CONCAT(newPass,@salt), 0), @salt, ClientName);
		
		-- Make sure ACL doesn't exist already for pub/sub to own node data
		SET @topic = CONCAT('spBv1.0/',ClientName,'/+/${username}');
		SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic = @topic);
		IF @count = 0 THEN
			-- Generate ACL
			INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'all', 'allow', @topic);
		END IF;
		
		-- Make sure ACL doesn't exist already for pub/sub to own device data
		SET @topic = CONCAT('spBv1.0/',ClientName,'/+/${username}/#');
		SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic = @topic);
		IF @count = 0 THEN
			-- Generate ACL
			INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'all', 'allow', @topic);
		END IF;
	END IF;
END$$
DELIMITER ;
DELIMITER $$
CREATE PROCEDURE `sp_addAppIDUser`(IN `newAppID` VARCHAR(50), IN `newPass` VARCHAR(50))
	MODIFIES SQL DATA
BEGIN
	-- Check to make sure the user doesn't exist already
	SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username LIKE newAppID);
	IF @count = 0 THEN
		-- Generate random salt and create new user
		SET @salt = MD5(RAND());
		INSERT INTO mqtt_user (`username`, `password_hash`, `salt`) VALUES (newAppID, SHA2(CONCAT(newPass,@salt), 0), @salt);
		
		-- Make sure ACL doesn't exist already for this user to sub to all Sparkplug B topics
		SET @topic = 'spBv1.0/#';
		SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = newAppID AND mqtt_acl.topic = @topic);
		IF @count = 0 THEN
			-- Generate ACL
			INSERT INTO mqtt_acl (username, action, permission, topic) VALUES (newAppID, 'all', 'allow', @topic);
		END IF;
		-- Make sure ACL doesn't exist already for pub/sub to own AppID state
		SET @topic = CONCAT('STATE/',newAppID);
		SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = newAppID AND mqtt_acl.topic = @topic);
		IF @count = 0 THEN
			-- Generate ACL
			INSERT INTO mqtt_acl (username, action, permission, topic) VALUES (newAppID, 'all', 'allow', @topic);
		END IF;
		-- Make sure ACL doesn't exist already for others to sub to own AppID state
		SET @topic = CONCAT('STATE/',newAppID);
		SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic = @topic);
		IF @count = 0 THEN
			-- Generate ACL
			INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'subscribe', 'allow', @topic);
		END IF;
	END IF;
END$$
DELIMITER ;
DELIMITER $$
CREATE PROCEDURE `sp_changePassword`(IN `username` VARCHAR(50), IN `newPass` VARCHAR(50))
	MODIFIES SQL DATA
BEGIN
	-- Check to make sure the user exists already
	SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username = username);
	IF @count = 1 THEN
		-- Generate random salt and create new user
		SET @salt = MD5(RAND());
		UPDATE mqtt_user SET `password_hash` = SHA2(CONCAT(newPass,@salt), 0), `salt` = @salt WHERE mqtt_user.username = username;
	END IF;
END$$
DELIMITER ;
DELIMITER $$
CREATE PROCEDURE `sp_removeUser`(IN `username` VARCHAR(50))
	MODIFIES SQL DATA
BEGIN
	-- Check to make sure the user exists already
	SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username = username);
	IF @count = 1 THEN
		-- Generate check to see if it has a client group
		SET @clientgroup = (SELECT client_group FROM mqtt_user WHERE mqtt_user.username = username);
		IF @clientgroup IS NULL THEN
			-- Delete ACL records first
			DELETE FROM mqtt_acl WHERE mqtt_acl.username = username;
			DELETE FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic = CONCAT('STATE/',username);
		ELSE
			-- Check to make sure the client group isn't used anywhere else
			SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.client_group = @clientgroup);
			-- If only 1 found, then this user is the only user using this group
			IF @count = 1 THEN
				-- Delete ACL records first
				DELETE FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic LIKE CONCAT('spBv1.0/',@clientgroup,'/%');
			END IF;
		END IF;
		-- Delete user record last
		DELETE FROM mqtt_user WHERE mqtt_user.username = username;
	END IF;
END$$
DELIMITER ;Add the EMQ-X Authentication method for MySQL:
Make sure the server, database, username, password, and TLS configuration matches your MySQL settings.
Modify the SQL statement to be as follows to allow for superuser access:
SELECT password_hash, salt, is_superuser FROM mqtt_user where username = ${username} LIMIT 1Next, add the EMQ-X Authorization method for MySQL:
Make sure the server, database, username, password, and TLS configuration matches your MySQL settings.
Modify the SQL statement to be as follows to allow for using the "$all" username placeholder:
SELECT action, permission, topic FROM mqtt_acl where username = ${username} OR username = '$all'We will also need to change the default security to block any unauthorized users:
Delete the File Authorization settings and in the generic Authorization settings, change the following settings:
- No Match: Deny
- Enable Cache: Yes
- Max number of cache per client: 1024
- TTL: 1 minute
