Install EMQ-X MQTT Broker (v5.x)
Install Supporting Software
If installing as a single server with integrated database authentication, install the following:
Install EMQ-X
Ubuntu 20.04
wget https://www.emqx.com/en/downloads/broker/v5.8.6/emqx-5.8.6-ubuntu20.04-amd64.deb
sudo dpkg -i ./emqx-5.8.6-ubuntu20.04-amd64.deb
Ubuntu 22.04
wget https://www.emqx.com/en/downloads/broker/v5.8.6/emqx-5.8.6-ubuntu22.04-amd64.deb
sudo dpkg -i ./emqx-5.8.6-ubuntu22.04-amd64.deb
Ubuntu 24.04
wget https://www.emqx.com/en/downloads/broker/v5.8.6/emqx-5.8.6-ubuntu24.04-amd64.deb
sudo dpkg -i ./emqx-5.8.6-ubuntu24.04-amd64.deb
Configure EMQ-X
Edit the EMQ-X configuration to accommodate the proxy:
sudo nano /etc/emqx/emqx.conf
Find the following section and edit as appropriate for the desired node name and cookie for if using clustering:
## Default Node Name: emqx@127.0.0.1
node {
name = "emqx@192.168.1.91"
cookie = emqxsecretcookie
data_dir = "/var/lib/emqx"
etc_dir = "/etc/emqx"
}
Start EMQ-X
sudo service emqx start
Login to EMQ-X
Open your browser to the URL for the server: http://<IPAddr>:18083
The username is admin
and the password is public
Be sure to change the password immediately!
Configure EMQ-X for HAProxy
If installing behind a reverse proxy such as HAProxy you will need the following similar configuration inside your HAProxy server configuration:
Make sure to adjust accordingly for SSL and IP address
If clustering, configure the servers similar to this:
frontend mqtt_fe
mode tcp
option tcplog
# Don't allow unencrypted connections
#bind *:1883
bind *:8883 ssl crt /etc/ssl/private
# Reject connections that have an invalid MQTT packet
tcp-request inspect-delay 10s
tcp-request content reject unless { req.payload(0,0),mqtt_is_valid }
default_backend mqtt_be
backend mqtt_be
mode tcp
balance leastconn
# Create a stick table for session persistence
stick-table type string len 32 size 100k expire 30m
# Use ClientID / client_identifier as persistence key
stick on req.payload(0,0),mqtt_field_value(connect,client_identifier)
server mqtt1 192.168.1.91:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5
server mqtt2 192.168.1.92:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5
If running multiple individual servers modify it to be similar to this:
frontend mqtt_fe
mode tcp
option tcplog
# Don't allow unencrypted connections
#bind *:1883
bind *:8883 ssl crt /etc/ssl/private
# Reject connections that have an invalid MQTT packet
tcp-request inspect-delay 10s
tcp-request content reject unless { req.payload(0,0),mqtt_is_valid }
acl mqtt1 ssl_fc_sni -i mqtt1.<domain>.com
acl mqtt2 ssl_fc_sni -i mqtt2.<domain>.com
use_backend mqtt1_be if mqtt1
use_backend mqtt2_be if mqtt2
default_backend mqtt_be
backend mqtt1_be
mode tcp
server mqtt1 192.168.1.91:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5
backend mqtt2_be
mode tcp
server mqtt2 192.168.1.92:1883 check-send-proxy send-proxy-v2 check inter 10s fall 2 rise 5
Next, edit the EMQ-X listener settings in the web interface for listener tcp
bound to 0.0.0.0:1883
to accommodate the proxy:
- Change the Proxy Protocol setting to
true
For a cluster, edit the EMQ-X configuration to change the discovery mode:
sudo nano /etc/emqx/emqx.conf
Find the following section and set the discovery mode to static, list all the nodes that will be part of the cluster, and make sure the node cookie is set to some secure password that matches on all cluster nodes:
## Cluster auto-discovery strategy.
##
## Value: Enum
## - manual: Manual join command
## - static: Static node list
## - mcast: IP Multicast
## - dns: DNS A Record
## - etcd: etcd
## - k8s: Kubernetes
##
## Default: manual
cluster {
name = emqxcl
discovery_strategy = static
static {
seeds = ["emqx@192.168.1.91", "emqx@192.168.1.92"]
}
}
Configure EMQ-X for Database Authentication using MySQL/MariaDB
Using any SQL tool, create the User and ACL table in the mqtt_user
database with the following SQL commands:
CREATE DATABASE `mqtt_user`;
USE mqtt_user;
CREATE TABLE IF NOT EXISTS `mqtt_user` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`username` varchar(100) DEFAULT NULL,
`password_hash` varchar(100) DEFAULT NULL,
`salt` varchar(35) DEFAULT NULL,
`is_superuser` tinyint DEFAULT 0,
`created` datetime DEFAULT current_timestamp(),
`client_group` varchar(50) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=UTF8MB4 COLLATE utf8mb4_general_ci;
CREATE TABLE IF NOT EXISTS `mqtt_acl` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`ipaddress` VARCHAR(60) NOT NULL DEFAULT '',
`username` VARCHAR(255) NOT NULL DEFAULT '',
`clientid` VARCHAR(255) NOT NULL DEFAULT '',
`action` ENUM('publish', 'subscribe', 'all') NOT NULL,
`permission` ENUM('allow', 'deny') NOT NULL,
`topic` VARCHAR(255) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE utf8mb4_general_ci;
Generate logins using the following SQL command substituting username and password as necessary:
SET @salt=MD5(RAND());
INSERT INTO `mqtt_user` ( `username`, `password_hash`, `salt`) VALUES ('MyUsername', SHA2(CONCAT('MyPassword123', @salt),0), @salt);
Grant ACL permissions using the following SQL command substituting the username, and topic as necessary:
ACL rule table field description:
- ipaddress: Set IP address
- username: User name for connecting to the client. If the value is set to
$all
, the rule applies to all users. - clientid: Client ID of the connected client
- action: Allowed operations:
publish
,subscribe
,all
- permission:
allow
,deny
- topic: Topics to be controlled, which can use wildcards, and placeholders can be added to the topic to match client information. For example, the topic will be replaced with the client ID of the current client when matching
t/%c
- ${username}: Username
- ${clientid}: Client ID
- ${peerhost}: Client's IP Address
-- All users cannot subscribe to system topics
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'subscribe', 'deny', '$SYS/#');
-- Allow clients on 10.59.1.100 to subscribe to system topics
INSERT INTO mqtt_acl (ipaddress, action, permission, topic) VALUES ('10.59.1.100', 'subscribe', 'allow', '$SYS/#');
-- Deny client to subscribe to the topic of /smarthome/+/temperature
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'subscribe', 'deny', '/smarthome/+/temperature');
-- Allow clients to subscribe to the topic of /smarthome/${clientid}/temperature with their own Client ID
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'subscribe', 'allow', '/smarthome/${clientid}/temperature');
For an easier way, the following stored procedures make adding/removing users for Ignition's CirrusLink MQTT module and ACLs much easier:
DELIMITER $$
CREATE PROCEDURE `sp_addSuperUser`(IN `newUser` VARCHAR(50), IN `newPass` VARCHAR(50))
NO SQL
BEGIN
-- Check to make sure the user doesn't exist already
SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username LIKE newUser);
IF @count = 0 THEN
-- Generate random salt and create new user
SET @salt = MD5(RAND());
INSERT INTO mqtt_user (`username`, `password_hash`, `salt`, `is_superuser`) VALUES (newUser, SHA2(CONCAT(newPass,@salt), 0), @salt, 1);
-- Make sure ACL doesn't exist already for pub/sub to own node data
SET @topic = '#';
SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = newUser AND mqtt_acl.topic = @topic);
IF @count = 0 THEN
-- Generate ACL
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES (newUser, 'all', 'allow', @topic);
END IF;
END IF;
END$$
DELIMITER ;
DELIMITER $$
CREATE PROCEDURE `sp_addEdgeNode`(IN `ClientName` VARCHAR(50), IN `Site` VARCHAR(50), IN `EdgeNode` VARCHAR(50), IN `newPass` VARCHAR(50))
NO SQL
BEGIN
-- Check to make sure the user doesn't exist already
SET @siteunit = CONCAT(Site,'-',EdgeNode);
SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username LIKE @siteunit);
IF @count = 0 THEN
-- Generate random salt and create new user
SET @salt = MD5(RAND());
INSERT INTO mqtt_user (`username`, `password_hash`, `salt`, `client_group`) VALUES (@siteunit, SHA2(CONCAT(newPass,@salt), 0), @salt, ClientName);
-- Make sure ACL doesn't exist already for pub/sub to own node data
SET @topic = CONCAT('spBv1.0/',ClientName,'/+/${username}');
SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic = @topic);
IF @count = 0 THEN
-- Generate ACL
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'all', 'allow', @topic);
END IF;
-- Make sure ACL doesn't exist already for pub/sub to own device data
SET @topic = CONCAT('spBv1.0/',ClientName,'/+/${username}/#');
SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic = @topic);
IF @count = 0 THEN
-- Generate ACL
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'all', 'allow', @topic);
END IF;
END IF;
END$$
DELIMITER ;
DELIMITER $$
CREATE PROCEDURE `sp_addAppIDUser`(IN `newAppID` VARCHAR(50), IN `newPass` VARCHAR(50))
MODIFIES SQL DATA
BEGIN
-- Check to make sure the user doesn't exist already
SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username LIKE newAppID);
IF @count = 0 THEN
-- Generate random salt and create new user
SET @salt = MD5(RAND());
INSERT INTO mqtt_user (`username`, `password_hash`, `salt`) VALUES (newAppID, SHA2(CONCAT(newPass,@salt), 0), @salt);
-- Make sure ACL doesn't exist already for this user to sub to all Sparkplug B topics
SET @topic = 'spBv1.0/#';
SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = newAppID AND mqtt_acl.topic = @topic);
IF @count = 0 THEN
-- Generate ACL
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES (newAppID, 'all', 'allow', @topic);
END IF;
-- Make sure ACL doesn't exist already for pub/sub to own AppID state
SET @topic = CONCAT('STATE/',newAppID);
SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = newAppID AND mqtt_acl.topic = @topic);
IF @count = 0 THEN
-- Generate ACL
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES (newAppID, 'all', 'allow', @topic);
END IF;
-- Make sure ACL doesn't exist already for others to sub to own AppID state
SET @topic = CONCAT('STATE/',newAppID);
SET @count = (SELECT COUNT(*) FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic = @topic);
IF @count = 0 THEN
-- Generate ACL
INSERT INTO mqtt_acl (username, action, permission, topic) VALUES ('$all', 'subscribe', 'allow', @topic);
END IF;
END IF;
END$$
DELIMITER ;
DELIMITER $$
CREATE PROCEDURE `sp_changePassword`(IN `username` VARCHAR(50), IN `newPass` VARCHAR(50))
MODIFIES SQL DATA
BEGIN
-- Check to make sure the user exists already
SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username = username);
IF @count = 1 THEN
-- Generate random salt and create new user
SET @salt = MD5(RAND());
UPDATE mqtt_user SET `password_hash` = SHA2(CONCAT(newPass,@salt), 0), `salt` = @salt WHERE mqtt_user.username = username;
END IF;
END$$
DELIMITER ;
DELIMITER $$
CREATE PROCEDURE `sp_removeUser`(IN `username` VARCHAR(50))
MODIFIES SQL DATA
BEGIN
-- Check to make sure the user exists already
SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.username = username);
IF @count = 1 THEN
-- Generate check to see if it has a client group
SET @clientgroup = (SELECT client_group FROM mqtt_user WHERE mqtt_user.username = username);
IF @clientgroup IS NULL THEN
-- Delete ACL records first
DELETE FROM mqtt_acl WHERE mqtt_acl.username = username;
DELETE FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic = CONCAT('STATE/',username);
ELSE
-- Check to make sure the client group isn't used anywhere else
SET @count = (SELECT COUNT(*) FROM mqtt_user WHERE mqtt_user.client_group = @clientgroup);
-- If only 1 found, then this user is the only user using this group
IF @count = 1 THEN
-- Delete ACL records first
DELETE FROM mqtt_acl WHERE mqtt_acl.username = '$all' AND mqtt_acl.topic LIKE CONCAT('spBv1.0/',@clientgroup,'/%');
END IF;
END IF;
-- Delete user record last
DELETE FROM mqtt_user WHERE mqtt_user.username = username;
END IF;
END$$
DELIMITER ;
Add the EMQ-X Authentication method for MySQL:
Make sure the server, database, username, password, and TLS configuration matches your MySQL settings.
Modify the SQL statement to be as follows to allow for superuser access:
SELECT password_hash, salt, is_superuser FROM mqtt_user where username = ${username} LIMIT 1
Next, add the EMQ-X Authorization method for MySQL:
Make sure the server, database, username, password, and TLS configuration matches your MySQL settings.
Modify the SQL statement to be as follows to allow for using the "$all" username placeholder:
SELECT action, permission, topic FROM mqtt_acl where username = ${username} OR username = '$all'
We will also need to change the default security to block any unauthorized users:
Delete the File Authorization settings and in the generic Authorization settings, change the following settings:
- No Match: Deny
- Enable Cache: Yes
- Max number of cache per client: 1024
- TTL: 1 minute
No Comments