2024年2月にVersion2となり、そのままでは『‘Unsupported callback API version’ error』となるので解決したコードを将来使うかもしれないので備忘録として載せておきます。(やっとStackOverFlowで見つけました。)
Broker
MQTTはBroker経由で通信するので、Brokerがないと通信できません。 色々なサンプルコードを見ていると、『broker = ‘broker.emqx.io’』となっています。 最初はそれで通信しようとしましたが、どうも『broker.emqx.io』に登録しないといけないみたいで、無料枠は30日だけのようでした。 なので、pub.pyとsub.pyを実行しているWindows PCにEclipse mosquittoのx64版をダウンロードしてインストールしてBrokerを立上てpub,sub通信をしました。Download | Eclipse Mosquitto
Version2
2024年2月以降普通に『pip install paho-mqtt』とするとVersion2がインストールされてしまいます。
変更点
- on_connect()に新たに第5引数が出来ており、その第5引数に『properties』と記入しないといけない。
- mqtt.Client()にcallback_api_versionにVERSINO2を指定しないとデフォルトのVERSION1のAPIを呼び出してしまうので、標記のエラーが発生してしまう。
- Python runtimeは、Verion3.7以降でないといけない。
1、2を変更して、Brokerは同じPC内なので『localhost』として指定。
なかなか良い解決策を見つけるのが難しかったので備忘録として残しておきます。
import random import time from paho.mqtt import client as mqtt_client broker = 'localhost' port = 1883 topic = "python/mqtt" client_id = f'publish-ichiri-{random.randint(0, 1000)}' def connect_mqtt(): print("***ichiri in connect_mqtt PUB") def on_connect(client, userdata, flags, rc,properties): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2) client.on_connect = on_connect client.connect(broker, port) return client def publish(client): msg_count = 1 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic, msg) status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 if msg_count > 5: break def run(): client = connect_mqtt() client.loop_start() publish(client) client.loop_stop() if __name__ == '__main__': run()
on_disconnectは使っていません。 最初通信しなかった時のデバッグ用にと思って入れたのですが、60秒ごとにDisconnectedになりprintされ通信しようとしているのだなとはわかりましたが、『Unspecified ERROR』等原因が特定できあまりなく意味がありませんでした。 将来、on_disconnectも役に立つかもしれないので載せておきます。
import random from paho.mqtt import client as mqtt_client # broker = 'broker.emqx.io' broker = 'localhost' port = 1883 topic = "python/mqtt" client_id = f'python-mqtt-xx-{random.randint(0, 1000)}' def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc, properties): if flags.session_present: print("flag.session") if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) def on_disconnect(client, userdata,flags, rc, properties): print("Disconnected:",client, "Reason Code:",rc, "USER DATA:",userdata,"FLAGs:", flags, "PROPERTIES:", properties) client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2) client.on_connect = on_connect client.on_disconnect = on_disconnect client.connect(broker, port) return client def subscribe(client: mqtt_client): def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) #loop_foreverの受信時に使用するcallback関数の登録 client.on_message = on_message def run(): client = connect_mqtt() subscribe(client) client.loop_forever() if __name__ == '__main__': run()
loop_forever()
これは以下を自動的に繰り返し実行してくれます。
- 接続確立
- もし接続が不安定で切れたとしても自動的に再接続してくれる。
- メッセージの受信
- client.on_messageに登録した関数をcallbackとして使用して、何度も受信しても登録したcallbackを実行してくれる。 上記の場合はdef on_message()
- 接続の維持
loop_foreverに登録されるcallback
それぞれイベント発生時に実行されます。
- on_connect
- on_disconnect
- on_message
- on_publish
- on_subscribe
- on_unsubscribe
参考
サンプルコード
paho-mqttを『-g』(global)なしでインストールすると以下のディレクトリにサンプルコードが入っていました。
C:\Users\<user_name>\AppData\Local\Programs\Python\Python311\Lib\site-packages\paho\mqtt
別PCでBroker
pub.pyとsub.pyを実行するWindows PCと同じサブネット内にあるuBuntu PCでEclipse Mosquitto Brokerを実行して、そのIPアドレスを指定したのですが、何故か通信できず。 Windows PCのZscaller Proxyのせいか、原因は分からずしまい。
client.connect(broker, port)の前に以下を挿入したらOKになるかもしれない。
client.proxy_set(proxy_type=socks.PROXY_TYPE_HTTP, proxy_addr="gateway.zscaler.net", proxy_port=80)
コメント