Mam prostego abonenta MQTT zaimplementowanego w klasie MqttHelper, który działa dobrze i odbiera subskrypcje. Ale jak mam sobie radzić, gdy muszę wysłać wiadomość do serwera z głównego programu. Mam metodę publish, która działa dobrze od IMqttActionListener, ale jak wysłać tekst z programu głównego po naciśnięciu przycisku?

package com.kkk.mqtt.helpers;

import android.content.Context;
import android.util.Log;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.io.UnsupportedEncodingException;

public class MqttHelper {
  public MqttAndroidClient mqttAndroidClient;

  final String serverUri = "tcp://tailor.cloudmqtt.com:16424";

  final String clientId = "ExampleAndroidClient";
  public final String subscriptionTopic = "sensor";

  final String username = "xxx";
  final String password = "yyy";  public MqttHelper(Context context){
    mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId);

    mqttAndroidClient.setCallback(new MqttCallbackExtended() {
      @Override
      public void connectComplete(boolean b, String s) {
        Log.w("mqtt", s);
      }

      @Override
      public void connectionLost(Throwable throwable) {

      }

      @Override
      public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        Log.w("Mqtt", mqttMessage.toString());
      }

      @Override
      public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

      }
    });
    connect();
  }

  public void setCallback(MqttCallbackExtended callback) {
    mqttAndroidClient.setCallback(callback);
  }


  public void publish(String topic, String info)
  {


    byte[] encodedInfo = new byte[0];
    try {
      encodedInfo = info.getBytes("UTF-8");
      MqttMessage message = new MqttMessage(encodedInfo);
      mqttAndroidClient.publish(topic, message);
      Log.e ("Mqtt", "publish done");
    } catch (UnsupportedEncodingException | MqttException e) {
      e.printStackTrace();
      Log.e ("Mqtt", e.getMessage());
    }catch (Exception e) {
      Log.e ("Mqtt", "general exception "+e.getMessage());
    }

  }

  private void connect(){
    Log.w("Mqtt", "connect start " );
    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    mqttConnectOptions.setAutomaticReconnect(true);
    mqttConnectOptions.setCleanSession(false);
    mqttConnectOptions.setUserName(username);
    mqttConnectOptions.setPassword(password.toCharArray());

    try {

      mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener()
      {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
          Log.w("Mqtt", "onSuccess " );
          DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
          disconnectedBufferOptions.setBufferEnabled(true);
          disconnectedBufferOptions.setBufferSize(100);
          disconnectedBufferOptions.setPersistBuffer(false);
          disconnectedBufferOptions.setDeleteOldestMessages(false);
          mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
          subscribeToTopic();
          publish(MqttHelper.this.subscriptionTopic,"information");
        }

        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
          Log.w("Mqtt", "Failed to connect to: " + serverUri + exception.toString());
        }
      });


    } catch (MqttException ex){
      ex.printStackTrace();
    }
  }


  private void subscribeToTopic() {
    try {
      mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
          Log.w("Mqtt","Subscribed!");
        }

        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
          Log.w("Mqtt", "Subscribed fail!");
        }
      });

    } catch (MqttException ex) {
      System.err.println("Exception whilst subscribing");
      ex.printStackTrace();
    }
  }
}

Kod uruchamiający subskrybenta MQTT:

private void startMqtt() {
  mqttHelper = new MqttHelper(getApplicationContext());
  mqttHelper.setCallback(new MqttCallbackExtended()
  {
    @Override
    public void connectComplete(boolean b, String s) {
      Log.w("Mqtt", "Connect complete"+ s );
    }

    @Override
    public void connectionLost(Throwable throwable) {
      Log.w("Mqtt", "Connection lost" );
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
      Log.w("Mqtt", mqttMessage.toString());
      dataReceived.setText(mqttMessage.toString());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
      Log.w("Mqtt", "Delivery complete" );

    }
  });
  Log.w("Mqtt", "will publish");


}
0
vico 20 listopad 2019, 13:30

2 odpowiedzi

Paho nie działa w wątku interfejsu użytkownika, ale może asynchronicznie odwoływać się do wątku interfejsu użytkownika.

Po prostu pozwól Activity lub Fragment zaimplementować interfejs MqttCallbackExtended:

public class SomeActivity extends AppCompatActivity implements MqttCallbackExtended { 

  ...

  @Override
  public void connectComplete(boolean reconnect, String serverURI) {
    Log.d("Mqtt", "Connect complete > " + serverURI);
  }

  @Override
  public void connectionLost(Throwable cause) {
    Log.d("Mqtt", "Connection lost");
  }

  @Override
  public void messageArrived(String topic, MqttMessage message) throws Exception {
    Log.d("Mqtt", "Received > " + topic + " > " + message.toString());
  }

  @Override
  public void deliveryComplete(IMqttDeliveryToken token) {
    Log.d("Mqtt", "Delivery complete");
  }
}

I skonstruuj MqttHelper z SomeActivity tak, jak jest MqttCallbackExtended listener:

public MqttHelper(Context context, MqttCallbackExtended listener) {
  this.mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId);
  this.mqttAndroidClient.setCallback(listener);
}

Na przykład:

this.mqttHelper = new MqttHelper(this);
this.mqttHelper.setCallback(this);

this.mqttHelper.publish("Java", "SomeActivity will handle the callbacks.");

Obsługa ich w Application jest problematyczna, ponieważ Application nie ma interfejsu użytkownika, a Context nie ma Theme. Ale dla klas rozciągających się na Activity, Fragment, DialogFragment, RecyclerView.Adapter itd. sensowne jest zaimplementowanie wywołania zwrotnego interface, gdy chcesz wchodzić w interakcję z ich interfejs użytkownika.


Dla porównania, MqttCallbackExtended extends MqttCallback.

2
Martin Zeitler 20 listopad 2019, 15:33

Inne rozwiązanie:

 1. Utwórz klasę MQTTService, która rozszerza android.app.Service.
  Klasa Android Service działa w wątku głównym. Więc jeśli chcesz użyć innego wątku, możesz po prostu użyć MqttAsyncClient.

 2. Będziesz otrzymywać wiadomości od brokera w innym wątku automatycznie (nie głównym wątku) w messageArrived() przy użyciu metody wywołania zwrotnego.

 3. Przekaż dane/polecenie z interfejsu użytkownika aplikacji (fragment aktywności,...) do MQTTService przez < strong>EventBus biblioteka po prostu.

 4. Ponownie użyj EventBus w metodzie wywołania zwrotnego messageArrived(), aby przekazać odebrane dane od brokera do żądanej sekcji aplikacji.
  Pamiętaj, że w tym kroku, jeśli miejscem docelowym jest interfejs aplikacji, musisz użyć @Subscribe(threadMode = ThreadMode.MAIN) w miejscu docelowym, aby uzyskać dane w głównym wątku.

Przykładowy kod:

public class MQTTService extends Service {

  private MqttAsyncClient mqttClient;
  private String serverURI;

   @Override
  public void onCreate() { 
    //do your initialization here
    serverURI = "tcp://yourBrokerUrlOrIP:yourBrokerPort";
    EventBus.getDefault().register(this);
  }

  @Override
  public int onStartCommand(Intent intent, int flags, int startId) {
   init();
   connect();
  }  private void init() {
     mqttClient = new MqttAsyncClient(serverURI, yourClientId, new MemoryPersistence())
     mqttClient.setCallback(new MqttCallback() {
      @Override
      public void connectionLost(Throwable cause) {
      }

      @Override
      public void messageArrived(String topic, MqttMessage message) throws Exception {
        //now you will receive messages from the broker in another thread automatically (not UI Thread).
        //You can do your logic here. for example pass the received data to the different sections of the application:
        EventBus.getDefault().post(new YourPOJO(topic, message, ...));
      }

      @Override
      public void deliveryComplete(IMqttDeliveryToken token) {
      }
    });
  }

  private MqttConnectOptions getOptions(){
    MqttConnectOptions options = new MqttConnectOptions();
    options.setKeepAliveInterval(...);
    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
    options.setAutomaticReconnect(true);
    options.setCleanSession(false);
    options.setUserName(...);
    options.setPassword(...);
    //options.setWill(...);
    //your other configurations
    return options;
  }

  private void connect() {
    try {
      IMqttToken token = mqttClient.connect(getOptions(), null, new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
          //do works after successful connection
        }

        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
        }
      });

    } catch (MqttException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void onDestroy() {
    EventBus.getDefault().unregister(this);
    mqttClient.close();
    mqttClient.disconnect();
  }

  //this method receives your command from the different application sections
  //you can simply create different "MqttCommandPOJO" classes for different purposes
  @Subscribe
  public void receiveFromApp1(MqttCommandPOJO1 pojo1) {
    //do your logic(1). For example:
    //publish or subscribe something to the broker (QOS=1 is a good choice).
  }

  @Subscribe
  public void receiveFromApp2(MqttCommandPOJO2 pojo2) {
    //do your logic(2). For example:
    //publish or subscribe something to the broker (QOS=1 is a good choice).
  }

}

Teraz możesz po prostu otrzymywać dane przekazane z MQTTService w każdej sekcji swojej aplikacji.

Na przykład:

public class MainActivity extends AppCompatActivity {
  ...

  @Subscribe(threadMode = ThreadMode.MAIN)
  public void receiveFromMQTTService(YourPojo pojo){
    //Do your logic. For example update the UI.
  }
}

Inne linki:
Instrukcje ogólne


Wszystkiego najlepszego

0
Abdul Alim Shakir 10 grudzień 2019, 11:03