Mam prostego abonenta MQTT zaimplementowanego w klasie MqttHelper
, który działa dobrze i odbiera subskrypcje. Ale jak mam sobie radzić, gdy muszę wysłać wiadomość do serwera z głównego programu. Mam metodę publish
, która działa dobrze od IMqttActionListener
, ale jak wysłać tekst z programu głównego po naciśnięciu przycisku?
package com.kkk.mqtt.helpers;
import android.content.Context;
import android.util.Log;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.io.UnsupportedEncodingException;
public class MqttHelper {
public MqttAndroidClient mqttAndroidClient;
final String serverUri = "tcp://tailor.cloudmqtt.com:16424";
final String clientId = "ExampleAndroidClient";
public final String subscriptionTopic = "sensor";
final String username = "xxx";
final String password = "yyy";
public MqttHelper(Context context){
mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId);
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean b, String s) {
Log.w("mqtt", s);
}
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
Log.w("Mqtt", mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
connect();
}
public void setCallback(MqttCallbackExtended callback) {
mqttAndroidClient.setCallback(callback);
}
public void publish(String topic, String info)
{
byte[] encodedInfo = new byte[0];
try {
encodedInfo = info.getBytes("UTF-8");
MqttMessage message = new MqttMessage(encodedInfo);
mqttAndroidClient.publish(topic, message);
Log.e ("Mqtt", "publish done");
} catch (UnsupportedEncodingException | MqttException e) {
e.printStackTrace();
Log.e ("Mqtt", e.getMessage());
}catch (Exception e) {
Log.e ("Mqtt", "general exception "+e.getMessage());
}
}
private void connect(){
Log.w("Mqtt", "connect start " );
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
try {
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener()
{
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.w("Mqtt", "onSuccess " );
DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
disconnectedBufferOptions.setBufferEnabled(true);
disconnectedBufferOptions.setBufferSize(100);
disconnectedBufferOptions.setPersistBuffer(false);
disconnectedBufferOptions.setDeleteOldestMessages(false);
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
subscribeToTopic();
publish(MqttHelper.this.subscriptionTopic,"information");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.w("Mqtt", "Failed to connect to: " + serverUri + exception.toString());
}
});
} catch (MqttException ex){
ex.printStackTrace();
}
}
private void subscribeToTopic() {
try {
mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.w("Mqtt","Subscribed!");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.w("Mqtt", "Subscribed fail!");
}
});
} catch (MqttException ex) {
System.err.println("Exception whilst subscribing");
ex.printStackTrace();
}
}
}
Kod uruchamiający subskrybenta MQTT:
private void startMqtt() {
mqttHelper = new MqttHelper(getApplicationContext());
mqttHelper.setCallback(new MqttCallbackExtended()
{
@Override
public void connectComplete(boolean b, String s) {
Log.w("Mqtt", "Connect complete"+ s );
}
@Override
public void connectionLost(Throwable throwable) {
Log.w("Mqtt", "Connection lost" );
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
Log.w("Mqtt", mqttMessage.toString());
dataReceived.setText(mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
Log.w("Mqtt", "Delivery complete" );
}
});
Log.w("Mqtt", "will publish");
}
2 odpowiedzi
Paho nie działa w wątku interfejsu użytkownika, ale może asynchronicznie odwoływać się do wątku interfejsu użytkownika.
Po prostu pozwól Activity
lub Fragment
zaimplementować interfejs MqttCallbackExtended
:
public class SomeActivity extends AppCompatActivity implements MqttCallbackExtended {
...
@Override
public void connectComplete(boolean reconnect, String serverURI) {
Log.d("Mqtt", "Connect complete > " + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
Log.d("Mqtt", "Connection lost");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d("Mqtt", "Received > " + topic + " > " + message.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.d("Mqtt", "Delivery complete");
}
}
I skonstruuj MqttHelper
z SomeActivity
tak, jak jest MqttCallbackExtended listener
:
public MqttHelper(Context context, MqttCallbackExtended listener) {
this.mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId);
this.mqttAndroidClient.setCallback(listener);
}
Na przykład:
this.mqttHelper = new MqttHelper(this);
this.mqttHelper.setCallback(this);
this.mqttHelper.publish("Java", "SomeActivity will handle the callbacks.");
Obsługa ich w Application
jest problematyczna, ponieważ Application
nie ma interfejsu użytkownika, a Context
nie ma Theme
. Ale dla klas rozciągających się na Activity
, Fragment
, DialogFragment
, RecyclerView.Adapter
itd. sensowne jest zaimplementowanie wywołania zwrotnego interface
, gdy chcesz wchodzić w interakcję z ich interfejs użytkownika.
Dla porównania, MqttCallbackExtended
extends
MqttCallback
.
Inne rozwiązanie:
Utwórz klasę
MQTTService
, która rozszerzaandroid.app.Service
.
Klasa Android Service działa w wątku głównym. Więc jeśli chcesz użyć innego wątku, możesz po prostu użyćMqttAsyncClient
.Będziesz otrzymywać wiadomości od brokera w innym wątku automatycznie (nie głównym wątku) w
messageArrived()
przy użyciu metody wywołania zwrotnego.Przekaż dane/polecenie z interfejsu użytkownika aplikacji (fragment aktywności,...) do
MQTTService
przez < strong>EventBus biblioteka po prostu.- Ponownie użyj EventBus w metodzie wywołania zwrotnego
messageArrived()
, aby przekazać odebrane dane od brokera do żądanej sekcji aplikacji.
Pamiętaj, że w tym kroku, jeśli miejscem docelowym jest interfejs aplikacji, musisz użyć@Subscribe(threadMode = ThreadMode.MAIN)
w miejscu docelowym, aby uzyskać dane w głównym wątku.
Przykładowy kod:
public class MQTTService extends Service {
private MqttAsyncClient mqttClient;
private String serverURI;
@Override
public void onCreate() {
//do your initialization here
serverURI = "tcp://yourBrokerUrlOrIP:yourBrokerPort";
EventBus.getDefault().register(this);
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
init();
connect();
}
private void init() {
mqttClient = new MqttAsyncClient(serverURI, yourClientId, new MemoryPersistence())
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
//now you will receive messages from the broker in another thread automatically (not UI Thread).
//You can do your logic here. for example pass the received data to the different sections of the application:
EventBus.getDefault().post(new YourPOJO(topic, message, ...));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
}
private MqttConnectOptions getOptions(){
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(...);
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
options.setAutomaticReconnect(true);
options.setCleanSession(false);
options.setUserName(...);
options.setPassword(...);
//options.setWill(...);
//your other configurations
return options;
}
private void connect() {
try {
IMqttToken token = mqttClient.connect(getOptions(), null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
//do works after successful connection
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onDestroy() {
EventBus.getDefault().unregister(this);
mqttClient.close();
mqttClient.disconnect();
}
//this method receives your command from the different application sections
//you can simply create different "MqttCommandPOJO" classes for different purposes
@Subscribe
public void receiveFromApp1(MqttCommandPOJO1 pojo1) {
//do your logic(1). For example:
//publish or subscribe something to the broker (QOS=1 is a good choice).
}
@Subscribe
public void receiveFromApp2(MqttCommandPOJO2 pojo2) {
//do your logic(2). For example:
//publish or subscribe something to the broker (QOS=1 is a good choice).
}
}
Teraz możesz po prostu otrzymywać dane przekazane z MQTTService
w każdej sekcji swojej aplikacji.
Na przykład:
public class MainActivity extends AppCompatActivity {
...
@Subscribe(threadMode = ThreadMode.MAIN)
public void receiveFromMQTTService(YourPojo pojo){
//Do your logic. For example update the UI.
}
}
Inne linki:
Instrukcje ogólne
Wszystkiego najlepszego
Podobne pytania
Nowe pytania
java
Java to język programowania wysokiego poziomu. Użyj tego tagu, jeśli masz problemy z używaniem lub zrozumieniem samego języka. Ten tag jest rzadko używany samodzielnie i jest najczęściej używany w połączeniu z [spring], [spring-boot], [jakarta-ee], [android], [javafx], [hadoop], [gradle] i [maven].