Commit be22070e authored by Lindenberg, Arthur-Vincent's avatar Lindenberg, Arthur-Vincent
Browse files

python based mqtt client for matlab with example and binary data pub/sub

parent c381e987
lib/__pycache__/
%% alter path to your python (3.8 is tested) distribution here
py_path = 'C:\Users\Arthur-Vincent\AppData\Local\Programs\Python\Python38\python.exe';
broker_ip = '127.0.0.1';
%% doing init
addpath('../lib');
myMqtt = mqtt_py_matlab(py_path);
myMqtt.connect_mqtt(broker_ip);
pause(1);
myMqtt.sub_mqtt_topics(broker_ip, "hello/test");
myMqtt.pub_mqtt(broker_ip, "hello/test", "WORLD!");
pause(1);
fprintf('%s', myMqtt.get_payload_str("hello/test"));
signal = rand(1, 2000)';
signal_b = typecast(single(signal), 'uint8');
myMqtt.sub_mqtt_topics(broker_ip, "hello/numericBinary");
myMqtt.pub_mqtt_binary(broker_ip, "hello/numericBinary", signal_b);
signal_in_b = myMqtt.get_payload_float("hello/numericBinary");
signal_in = cell2mat(cell(signal_in_b));
subplot(2,1,1);
plot(signal);
title('MQTT submitted signal');
subplot(2,1,2);
plot(signal_in);
title('MQTT received signal');
import paho.mqtt.client as mqtt
import logging
import socket
import struct
import ctypes
import threading
cfg = {
'application_example_stuff': {
'ip': "10.21.1.77"
},
'serial_example_stuff': {
'port': "Com8",
'baud': 115200
},
# ---------------------------
# take logging as it is
# Levels:
# FATAL = 50
# ERROR = 40
# WARNING = 30
# INFO = 20
# DEBUG = 10
# ---------------------------
'logging': {
'level': logging.INFO,
'filename': "log.txt",
'console': True,
'format': "%(asctime)s %(levelname)s: %(message)s"
}
}
# ------------------------------
# STARTING APPLICATION
# Use this section as it is
# ------------------------------
if cfg['logging']['console']:
logging.basicConfig(level=cfg['logging']['level'], format=cfg['logging']['format'])
else:
logging.basicConfig(filename=cfg['logging']['filename'], filemode='w', level=cfg['logging']['level'],
format=cfg['logging']['format'])
if cfg['logging']['console']:
ch = logging.StreamHandler()
ch.setLevel(cfg['logging']['level'])
def init_globals():
global isConnected
global mqtt_received
global mqtt_subscriptions
global queue_lck
isConnected = ""
mqtt_received = []
mqtt_subscriptions = []
queue_lck = threading.Lock()
def on_connect(client, userdata, flags, rc):
global isConnected
isConnected = "CONNECTED"
def on_message(client, userdata, msg):
global mqtt_rec_msg
global mqtt_received
global mqtt_subscriptions
global clientMqtt
global queue_lck
logging.debug("received top: " + msg.topic + " | received payload: " + str(msg.payload))
queue_lck.acquire()
try:
if mqtt_subscriptions.count(msg.topic) > 0:
mqtt_received.append(msg)
mqtt_subscriptions.remove(msg.topic)
if mqtt_subscriptions.count(msg.topic) < 1:
clientMqtt.unsubscribe(msg.topic)
finally:
queue_lck.release()
def on_disconnect(client, userdata, rc):
global isConnected
isConnected = ""
def connect(broker, keepalive=60):
global clientMqtt
global isConnected
if isConnected == "CONNECTED":
return
clientMqtt = mqtt.Client(socket.gethostname())
clientMqtt.on_connect = on_connect
clientMqtt.on_message = on_message
clientMqtt.on_disconnect = on_disconnect
clientMqtt.connect(broker, 1883, keepalive)
while isConnected != "CONNECTED":
clientMqtt.loop(.5)
def pub_mqtt_message(broker, topic, message):
global clientMqtt
connect(broker, 60)
clientMqtt.publish(topic, message)
clientMqtt.loop(.01)
def pub_mqtt_binary(broker, topic, message_bin):
global clientMqtt
connect(broker, 60)
clientMqtt.publish(topic, bytes(message_bin))
clientMqtt.loop(.01)
def sub_mqtt_topics(broker, topics):
global clientMqtt
global mqtt_subscriptions
global queue_lck
connect(broker, 60)
queue_lck.acquire()
try:
for t in topics:
mqtt_subscriptions.append(t)
clientMqtt.subscribe(t)
logging.debug("subscribed: " + t)
logging.debug(mqtt_subscriptions)
finally:
queue_lck.release()
def get_mqtt_msg():
global mqtt_received
if mqtt_received.len() > 0:
ret_elm = mqtt_received[0]
mqtt_received.pop(0)
return ret_elm
else:
clientMqtt.loop(.01)
return 0
def get_payload_str(topic):
global mqtt_received
global mqtt_subscriptions
for msg in mqtt_received:
if msg.topic == topic:
ret_val = str(msg.payload)
mqtt_received.remove(msg)
return ret_val
if mqtt_subscriptions.count(topic) > 0:
loop_keepalive()
return ""
return "EMPTY"
def get_payload_float(topic):
global mqtt_received
global mqtt_subscriptions
logging.debug(mqtt_subscriptions)
logging.debug(mqtt_received)
logging.debug(topic)
for msg in mqtt_received:
if msg.topic == topic:
arr_size = len(msg.payload) / ctypes.sizeof(ctypes.c_float)
ret_val = struct.unpack(str(int(arr_size)) + 'f', msg.payload)
mqtt_received.remove(msg)
return ret_val
if mqtt_subscriptions.count(topic) > 0:
loop_keepalive()
return float(0)
return float(-1)
def loop_keepalive():
global clientMqtt
clientMqtt.loop(.1)
def disconnect():
global clientMqtt
global isConnected
isConnected = ""
clientMqtt.disconnect()
classdef mqtt_py_matlab < handle
%MQTT_PY_MATLAB wrapper to read MQTT messages
properties (Access = public)
log_active = 0
log_text = ""
end
methods
function obj = mqtt_py_matlab(python_exe_64_path)
%MQTT_PY_MATLAB Construct an instance of this class
% python_exe_64_path: path to 64bit python exe
fprintf("Setting up local virtual environment");
envName = "py_venv";
system("virtualenv --python " + python_exe_64_path + " " + envName)
system(".\" + envName + "\Scripts\activate");
system(envName + "\Scripts\pip install paho-mqtt");
try
pyversion(".\" + envName + "\Scripts\python.exe");
catch
end
pyversion
mqttClientPath = fileparts(which('cosaMqttClient.py'));
if count(py.sys.path,mqttClientPath) == 0
insert(py.sys.path,int32(0),mqttClientPath);
end
py.cosaMqttClient.init_globals();
end
function delete(obj)
py.cosaMqttClient.disconnect();
if obj.log_active > 0
obj.log_text = [obj.log_text; "Client Disconnected"];
end
end
function connect_mqtt(obj, broker)
py.cosaMqttClient.connect(broker);
if obj.log_active > 0
obj.log_text = [obj.log_text; "Client Connected to Broker: " + broker];
end
end
function pub_mqtt(obj, broker, topic, message)
topic = cellstr(topic);
message = cellstr(message);
py.cosaMqttClient.pub_mqtt_message(broker, topic{1}, message{1});
if obj.log_active > 0
obj.log_text = [obj.log_text ; "PUB topic: " + topic + " | Message: " + message];
end
pause(0.01);
end
function pub_mqtt_binary(obj, broker, topic, message_bin)
topic = cellstr(topic);
py.cosaMqttClient.pub_mqtt_binary(broker, topic{1}, message_bin);
if obj.log_active > 0
obj.log_text = {obj.log_text ; "PUB topic: " + topic + " | Message: " + message_bin};
end
end
function sub_mqtt_topics(obj, broker, topics)
topics = cellstr(topics);
py.cosaMqttClient.sub_mqtt_topics(broker, topics);
if obj.log_active > 0
obj.log_text = [obj.log_text ; "SUB topics: " + topics];
end
end
function payload = get_payload_str(obj, topic)
topic = cellstr(topic);
while 1
payload = string(py.cosaMqttClient.get_payload_str(topic{1}));
if payload == "EMPTY"
error("Tried to read topic without result or active subscription");
end
if payload ~= ""
if obj.log_active > 0
obj.log_text = [obj.log_text; "Received Topic: " + topic{1} + " | Message: " + payload];
end
return;
end
end
end
function payload = get_payload_float(obj, topic)
topic = cellstr(topic);
while 1
payload = py.cosaMqttClient.get_payload_float(topic{1});
if payload == -1
error("Tried to read topic without result or active subscription");
end
if payload ~= 0
if obj.log_active > 0
obj.log_text = [obj.log_text; "Received Topic: " + topic{1} + " | Message: " + payload];
end
return;
end
end
end
function payload = get_payload_float_once(obj, topic)
topic = cellstr(topic);
payload = py.cosaMqttClient.get_payload_float(topic{1});
end
end
methods (Static)
function loop_keepalive()
py.cosaMqttClient.loop_keepalive();
end
end
end
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment