Abstract

AWS IoT Core を使っていた Python アプリを、ローカル環境でも動作できるようにしたかったため、 ActiveMQ Classic(MQTTブローカー)+ Spring Boot(JMS)+ Python(Paho MQTT) という構成でミニマムなサンプルを作りました。

ローカルで MQTT を試したいときや、IoT Core の代替実装を検証したいときなどに役立つと思うので、手順をまとめておきます。

1. Overview

今回の構成はシンプルで、

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
+---------------------+
|  Python (Paho MQTT) |
+---------------------+
           |
           | MQTT (1883)
           v
+----------------------+
|   ActiveMQ Classic   |
+----------------------+
           |
           | JMS (61616)
           v
+----------------------+
|   Spring Boot (JMS)  |
+----------------------+

という流れになります。

ポイントは、ActiveMQ が MQTT ↔ JMS を自動でブリッジしてくれること。
そのため、Spring Boot は JMS だけで書いて、Python 側は MQTT のままで OK です。

2. ActiveMQ Classic(MQTT)をローカルで起動する

Docker で立ち上げるのが簡単です。

docker-compose.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
version: '3.8'

services:
  activemq:
    image: rmohr/activemq:latest
    container_name: activemq
    ports:
      - "61616:61616"   # JMS
      - "1883:1883"     # MQTT
      - "8161:8161"     # Web Console
    environment:
      - ACTIVEMQ_USER=admin
      - ACTIVEMQ_PASSWORD=admin
    restart: unless-stopped

起動:

1
docker compose up -d

Webコンソール:

1
http://localhost:8161/

3. Spring Boot(JMS)側の設定

Gradle + application.yaml の構成。

build.gradle(主要部分のみ)

1
2
3
4
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-activemq'
}

application.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin

  jms:
    pub-sub-domain: true

server:
  port: 8080

3.1. JMS Topic(MQTT に対応する Topic 名)

MQTT(iot/topic/sample)と対応するように、JMS 側はドット区切りにするのがポイント。

1
public static final String MQTT_TOPIC_JMS = "iot.topic.sample";

ActiveMQ が以下のように自動変換してくれるため:

  • JMS: iot.topic.sample
  • MQTT: iot/topic/sample

3.2. Spring Boot の Producer / Listener

3.2.1. Producer(JMS → MQTT)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Service
public class MessageProducer {

    private final JmsTemplate jms;

    public MessageProducer(JmsTemplate jms) {
        this.jms = jms;
    }

    public void send(String message) {
        jms.convertAndSend(message);
    }
}

3.2.2. Listener(MQTT → JMS)

1
2
3
4
5
6
7
8
9
@Component
public class MessageConsumer {

    @JmsListener(destination = JmsConfig.MQTT_TOPIC_JMS,
                 containerFactory = "jmsListenerContainerFactory")
    public void receive(String message) {
        System.out.println("Received: " + message);
    }
}

3.2.3. TestController(動作テスト用)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@RestController
@RequestMapping("/api/test")
public class TestController {

    private final MessageProducer producer;

    public TestController(MessageProducer producer) {
        this.producer = producer;
    }

    @PostMapping
    public String send(@RequestBody String body) {
        producer.send(body);
        return "sent: " + body;
    }
}

Tips:
@RequestBody String を使っているため、curl の Content-Type は text/plain が必要。 application/json を使う場合は POJO を受ける形に変更してください。

4. Python(Paho MQTT)クライアント

1
pip install paho-mqtt
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
import paho.mqtt.client as mqtt

BROKER = "localhost"
PORT   = 1883
TOPIC  = "iot/topic/sample"

def on_connect(client, userdata, flags, rc):
    print("connected:", rc)
    client.subscribe(TOPIC, qos=1)

def on_message(client, userdata, msg):
    print("[MQTT]", msg.topic, msg.payload.decode())

client = mqtt.Client(client_id="local-python-client", protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_message = on_message

client.username_pw_set("admin", "admin")
client.connect(BROKER, PORT)
client.loop_start()

# publish loop
i = 0
while True:
    payload = f"hello {i}"
    print("pub:", payload)
    client.publish(TOPIC, payload)
    i += 1
    time.sleep(5)

5. curl(POST)

1
2
3
4
curl -X POST \
  -H "Content-Type: text/plain" \
  --data "hello from curl" \
  http://localhost:8080/api/test

6. Execution Result

  1. ActiveMQ を起動
  2. Spring Boot を起動
  3. Python を起動
  4. 別ターミナルから curl を実行

すると、

Python 側に

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
❯ python ./client.py
/path/to/sample_app/mqttclient/./client.py:22: DeprecationWarning: Callback API version 1 is deprecated, update to latest version
  client = mqtt.Client(client_id="local-python-client", protocol=mqtt.MQTTv311)
Publishing: hello 0
Connected with result code: 0
Publishing: hello 1
[MQTT] Topic: iot/topic/sample, Payload: hello 1
[MQTT] Topic: iot/topic/sample, Payload: hello from curl
Publishing: hello 2
[MQTT] Topic: iot/topic/sample, Payload: hello 2
Publishing: hello 3
[MQTT] Topic: iot/topic/sample, Payload: hello 3

Tips:
DeprecationWarning は Paho MQTT の Callback API v1 に関する警告です。
動作自体には影響ありません。(気になる場合は v2 API に移行可能)

SpringBoot 側に

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
22:52:22:  ':com.example.mqttserver.MqttserverApplication.main()' を実行中…

> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes

> Task :com.example.mqttserver.MqttserverApplication.main()

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

 :: Spring Boot ::                (v4.0.0)

2025-11-25T22:52:23.194+09:00  INFO 29525 --- [mqttserver] [           main] c.e.mqttserver.MqttserverApplication     : Starting MqttserverApplication using Java 25.0.1 with PID 29525 (/path/to/mqttserver/.../build/classes/java/main started by takuto_n in /path/to/mqttserver/...)
2025-11-25T22:52:23.195+09:00  INFO 29525 --- [mqttserver] [           main] c.e.mqttserver.MqttserverApplication     : No active profile set, falling back to 1 default profile: "default"
2025-11-25T22:52:23.492+09:00  INFO 29525 --- [mqttserver] [           main] o.s.boot.tomcat.TomcatWebServer          : Tomcat initialized with port 8080 (http)
2025-11-25T22:52:23.498+09:00  INFO 29525 --- [mqttserver] [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2025-11-25T22:52:23.498+09:00  INFO 29525 --- [mqttserver] [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/11.0.14]
2025-11-25T22:52:23.512+09:00  INFO 29525 --- [mqttserver] [           main] b.w.c.s.WebApplicationContextInitializer : Root WebApplicationContext: initialization completed in 293 ms
2025-11-25T22:52:23.661+09:00  INFO 29525 --- [mqttserver] [           main] o.s.boot.tomcat.TomcatWebServer          : Tomcat started on port 8080 (http) with context path '/'
2025-11-25T22:52:23.829+09:00  INFO 29525 --- [mqttserver] [           main] c.e.mqttserver.MqttserverApplication     : Started MqttserverApplication in 0.763 seconds (process running for 0.878)
Received from MQTT/JMS: 104,101,108,108,111,32,48
Received from MQTT/JMS: 104,101,108,108,111,32,49
2025-11-25T22:52:38.933+09:00  INFO 29525 --- [mqttserver] [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2025-11-25T22:52:38.934+09:00  INFO 29525 --- [mqttserver] [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2025-11-25T22:52:38.935+09:00  INFO 29525 --- [mqttserver] [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
Received from MQTT/JMS: hello from curl

と表示される。

Tips:
最初のログがバイト列になっているのは ActiveMQ→JMS の変換で ByteMessage が使われるため。 後続のメッセージは String として扱われるため問題ありません。

7. Summary

  • ActiveMQ Classic を中央ブローカーにすれば、 MQTT(Python) ↔ JMS(Spring Boot) が自然に連携できる

  • ローカルで IoT Core の代替検証をしたいときに便利

  • トピック名は、以下のようにするのがポイント。(.区切り/区切り)

    • JMS:iot.topic.sample
    • MQTT:iot/topic/sample

Tips:
JMS と MQTT のトピック名は自動変換されるため、
JMS は iot.topic.sample、
MQTT は iot/topic/sample にする必要があります。