Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Removed translated content for 'zh'
Sv translation
languageen


Info
BellaDati IoT Collector SDK is an API allowing to provide own Receiver and Sender implementations.


Warning
In order to build valid and working Receiver or Sender distribution, you need to get Developers distribution from BellaDati.


Implementing own Receiver or Sender module

BellaDati IoT Data Collector Developers distribution allows to extend implemented core classes GenericReceiver and GenericSender.

Development process:

  1. Obtain BellaDati IoT Data Collector Developers distribution (contact BellaDati support team, it's not publicly available)
  2. Implement own Receiver or Sender implementation and owne MainVerticle
  3. Build project and your own JAR module
  4. Run built JAR and use it in your IoT Data Collector deployment


Tip

BellaDati IoT Data Collector is built using vert.x framework.


Custom Receiver

Customizing Receiver module is helpful in situations, when custom endpoint type or customized message handling is necessary.

Basic approach:

  • extend class GenericReceiver
  • implement MainVerticle.class
  • override initCustom(future) method with own implementation
    • processMessage(..) and finishProcessing(..) must be called inside message handling implementation
  • or override getJsonFromBuffer(..)/getJsonFromContext(..) if only message processing is needed to be customized
  • override stop() method, super.stop() must be included
  • set endpoint type to CUSTOM (in file configuration or using IoT Console)

Example (custom UDP server):

Code Block
languagejava
linenumberstrue
package com.belladati.iot.collector.generic.receiver.verticle;

import com.belladati.iot.collector.common.util.VerticleStatus;
import com.belladati.iot.collector.generic.receiver.verticle.MessageProcessor.ProcessedMessage;

import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.json.JsonObject;

public class CustomReceiver extends GenericReceiver {
	
	private DatagramSocket datagramSocket;
	
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#initCustom(io.vertx.core.Future)
	 */
	@Override
	protected void initCustom(Future<Void> future) {
		JsonObject udpConfig = config().getJsonObject("customConfig");
		int port = udpConfig.getInteger("port", 1234);
		String host = udpConfig.getString("host", "localhost");
		DatagramSocketOptions options = new DatagramSocketOptions(udpConfig.getJsonObject("options", new JsonObject()));
		datagramSocket = vertx.createDatagramSocket(options);
		datagramSocket.listen(port, host, ar -> {
			if (ar.succeeded()) {
				future.complete();
				log.info("Successfully listening on port " + port);
				ar.result().handler(packet -> {
					ProcessedMessage pm = processMessage(getJsonFromBuffer(packet.data()));
					if (pm.isError() || pm.isIgnore()) {
						log.error(pm.isIgnore() ? "message ignored" : pm.getError());
						return;
					}
					log.info("Accepting message: " + pm.getMessage());
					finishMessageProcessing(System.currentTimeMillis(), pm);
				});
			} else {
				future.fail(ar.cause());
				log.error("Datagram socket listen failed", ar.cause());
			}
		});
	}
 
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#getJsonFromContext(io.vertx.ext.web.RoutingContext)
	 */
	@Override
	protected JsonObject getJsonFromContext(RoutingContext routingContext) {
		//TODO implement own message payload processing for HTTP
		return super.getJsonFromContext(routingContext);
	}
	
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#getJsonFromBuffer(io.vertx.core.buffer.Buffer)
	 */
	@Override
	protected JsonObject getJsonFromBuffer(Buffer b) {
		//TODO implement own message payload processing for MQTT and UDP datagrams
		return super.getJsonFromBuffer(b);
	}
	
	/* (non-Javadoc)
	 * @see io.vertx.core.AbstractVerticle#stop()
	 */
	@Override
	public void stop(Future<Void> future) throws Exception {
		datagramSocket.close(ar -> {
			super.stop(future);
		});
	}
}


Custom Sender

Customizing Sender module is helpful in situations, when custom automated action is necessary.

Basic approach:

  • extend GenericSender class with own implementation
  • implement Action interface
  • override createCustomAction() method and return own Action class
  • implement MainVerticle.class
  • add action type CUSTOM to the configuration

Example:

Code Block
languagejava
linenumberstrue
package com.belladati.iot.collector.generic.sender.verticle;

import java.io.File;
import java.time.Instant;
import java.util.function.Function;
import com.belladati.iot.collector.common.BellaDatiVerticle;
import com.belladati.iot.collector.common.util.BellaDatiUtils;
import com.belladati.iot.collector.generic.sender.verticle.ActionRulesChecker.ProcessedData;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;

public class CustomAction implements Action {
	private boolean isInitialized;
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#init()
	 */
	@Override
	public Function<Void, Future<Void>> init() {
		return v -> {
			log.info("Initializing action: " + getClass().getSimpleName());
			Future<Void> future = Future.future();
			//TODO implement initialisation if needed
            isInitialized = true;
			future.complete(v);
			return future;
		};
	}
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#close()
	 */
	@Override
	public Function<Void, Future<Void>> close() {
		return v -> {
			log.info("Closing action: " + getClass().getSimpleName());
			Future<Void> future = Future.future();
			//TODO implement closing method if needed
			future.complete(v);
			return future;
		};
	}
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#process(com.belladati.iot.collector.generic.sender.verticle.ActionRulesChecker.ProcessedData)
	 */
	@Override
    public Function<Boolean, Future<Boolean>> process(ProcessedData message);
        return prevResult -> {
			if (!prevResult) {
				return Future.succeededFuture(false);
			}
			JsonArray data = message.getData();
			Future<Boolean> future = Future.future();
			if (!data.isEmpty()) {
				//TODO implement own message processing method
				future.complete(true);
			} else {
				future.complete(false);
            }
			return future;
		};
		return true;
	}
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#isContinueOnFailure()
	 */
	@Override
	public boolean isInitialized() {
		return isInitialized;
	}
}


Implement MainVerticle.class

Code Block
languagejava
linenumberstrue
public class MainVerticle extends AbstractVerticle {
	private static final Logger log = LoggerFactory.getLogger(MainVerticle.class);
	@Override
	public void start(Future<Void> startFuture) throws Exception {
		JsonObject config = config();
		JsonArray receivers = config.getJsonArray("receivers", new JsonArray());
		if (receivers.isEmpty()) {
			throw new IllegalArgumentException("No receivers specified");
		}
		List<Future> futures = new ArrayList<>();
		receivers.forEach(obj -> {
			futures.add(startChild(obj));
		});
		CompositeFuture.all(futures).setHandler(ar -> {
			if (ar.failed()) {
				log.error("Cannot deploy " + receivers.size() + " receivers. Shutting down ...", ar.cause());
				startFuture.fail(ar.cause());
				vertx.deploymentIDs().forEach(vertx::undeploy);
			} else {
				log.info("Deployed " + receivers.size() + " receivers - " + vertx.deploymentIDs());
				startFuture.complete();
			}
		});
	}

	private Future<String> startChild(Object jsonObject) {
		JsonObject receiver = (JsonObject) jsonObject;
		Future<String> future = Future.future();
		try {
			vertx.deployVerticle(CustomReceiver.class.getName(), new DeploymentOptions().setConfig(receiver), future.completer());
		} catch (Exception e) {
			future.fail(e);
		}
		return future;
	}
}


Sv translation
languageja


Info

BellaDati IoTコレクターSDKは、独自のReceiverとSenderの実装を提供できるようにするAPIです。


Warning

有効に機能するReceiverやSenderディストリビューションを構築するには、BellaDatiからDevelopersディストリビューションを入手する必要があります。


独自のReceiverまたはSenderモジュールの実装

BellaDati IoT Data Collector Developersディストリビューションでは、実装されたコアクラス GenericReceiver および GenericSenderを拡張できます。

開発プロセス:

  1. BellaDati IoT Data Collector Developersディストリビューションを入手する (BellaDatiサポートチームにご連絡ください。公開されていません)
  2. 独自のReceiverまたはSenderの実装と、独自のMainVerticleを実装する
  3. プロジェクトと独自のJARモジュールをビルドする
  4. ビルドされたJARを実行し、IoT Data Collectorデプロイメントで使用する


Tip

BellaDati IoT Data Collectorは、vert.xフレームワークを使用して構築されています。


カスタムレシーバー (Receiver)

Receiverモジュールのカスタマイズは、カスタムエンドポイントタイプまたはカスタマイズされたメッセージ処理が必要な状況で役立ちます。

基本的なアプローチ:

  • GenericReceiver クラスを拡張します
  • MainVerticle.class を実装します
  • 独自の実装で initCustom(future) メソッドをオーバーライドします
    • processMessage(..) と finishProcessing(..) は、メッセージ処理の実装内で呼び出す必要があります
  • または、メッセージ処理のみをカスタマイズする必要がある場合は、getJsonFromBuffer(..)/getJsonFromContext(..) をオーバーライドします
  • stop() メソッドをオーバーライドし、super.stop() を含める必要があります
  • エンドポイントタイプをCUSTOMに設定します (ファイル構成またはIoTコンソールを使用)

例 (カスタムUDPサーバ):

Code Block
languagejava
linenumberstrue
package com.belladati.iot.collector.generic.receiver.verticle;

import com.belladati.iot.collector.common.util.VerticleStatus;
import com.belladati.iot.collector.generic.receiver.verticle.MessageProcessor.ProcessedMessage;

import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.json.JsonObject;

public class CustomReceiver extends GenericReceiver {
	
	private DatagramSocket datagramSocket;
	
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#initCustom(io.vertx.core.Future)
	 */
	@Override
	protected void initCustom(Future<Void> future) {
		JsonObject udpConfig = config().getJsonObject("customConfig");
		int port = udpConfig.getInteger("port", 1234);
		String host = udpConfig.getString("host", "localhost");
		DatagramSocketOptions options = new DatagramSocketOptions(udpConfig.getJsonObject("options", new JsonObject()));
		datagramSocket = vertx.createDatagramSocket(options);
		datagramSocket.listen(port, host, ar -> {
			if (ar.succeeded()) {
				future.complete();
				log.info("Successfully listening on port " + port);
				ar.result().handler(packet -> {
					ProcessedMessage pm = processMessage(getJsonFromBuffer(packet.data()));
					if (pm.isError() || pm.isIgnore()) {
						log.error(pm.isIgnore() ? "message ignored" : pm.getError());
						return;
					}
					log.info("Accepting message: " + pm.getMessage());
					finishMessageProcessing(System.currentTimeMillis(), pm);
				});
			} else {
				future.fail(ar.cause());
				log.error("Datagram socket listen failed", ar.cause());
			}
		});
	}
 
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#getJsonFromContext(io.vertx.ext.web.RoutingContext)
	 */
	@Override
	protected JsonObject getJsonFromContext(RoutingContext routingContext) {
		//TODO implement own message payload processing for HTTP
		return super.getJsonFromContext(routingContext);
	}
	
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#getJsonFromBuffer(io.vertx.core.buffer.Buffer)
	 */
	@Override
	protected JsonObject getJsonFromBuffer(Buffer b) {
		//TODO implement own message payload processing for MQTT and UDP datagrams
		return super.getJsonFromBuffer(b);
	}
	
	/* (non-Javadoc)
	 * @see io.vertx.core.AbstractVerticle#stop()
	 */
	@Override
	public void stop(Future<Void> future) throws Exception {
		datagramSocket.close(ar -> {
			super.stop(future);
		});
	}
}


カスタムセンダー (Sender)

Senderモジュールのカスタマイズは、カスタムの自動アクションが必要な状況で役立ちます。

基本的なアプローチ:

  • GenericSender クラスを独自の実装で拡張します
  • Action インターフェイスを実装します
  • createCustomAction() メソッドをオーバーライドし、独自の Action クラスを返します
  • MainVerticle.class を実装します
  • アクションタイプCUSTOMを構成に追加します

例:

Code Block
languagejava
linenumberstrue
package com.belladati.iot.collector.generic.sender.verticle;

import java.io.File;
import java.time.Instant;
import java.util.function.Function;
import com.belladati.iot.collector.common.BellaDatiVerticle;
import com.belladati.iot.collector.common.util.BellaDatiUtils;
import com.belladati.iot.collector.generic.sender.verticle.ActionRulesChecker.ProcessedData;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;

public class CustomAction implements Action {
	private boolean isInitialized;
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#init()
	 */
	@Override
	public Function<Void, Future<Void>> init() {
		return v -> {
			log.info("Initializing action: " + getClass().getSimpleName());
			Future<Void> future = Future.future();
			//TODO implement initialisation if needed
            isInitialized = true;
			future.complete(v);
			return future;
		};
	}
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#close()
	 */
	@Override
	public Function<Void, Future<Void>> close() {
		return v -> {
			log.info("Closing action: " + getClass().getSimpleName());
			Future<Void> future = Future.future();
			//TODO implement closing method if needed
			future.complete(v);
			return future;
		};
	}
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#process(com.belladati.iot.collector.generic.sender.verticle.ActionRulesChecker.ProcessedData)
	 */
	@Override
    public Function<Boolean, Future<Boolean>> process(ProcessedData message);
        return prevResult -> {
			if (!prevResult) {
				return Future.succeededFuture(false);
			}
			JsonArray data = message.getData();
			Future<Boolean> future = Future.future();
			if (!data.isEmpty()) {
				//TODO implement own message processing method
				future.complete(true);
			} else {
				future.complete(false);
            }
			return future;
		};
		return true;
	}
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#isContinueOnFailure()
	 */
	@Override
	public boolean isInitialized() {
		return isInitialized;
	}
}


MainVerticle.class の実装

Code Block
languagejava
linenumberstrue
public class MainVerticle extends AbstractVerticle {
	private static final Logger log = LoggerFactory.getLogger(MainVerticle.class);
	@Override
	public void start(Future<Void> startFuture) throws Exception {
		JsonObject config = config();
		JsonArray receivers = config.getJsonArray("receivers", new JsonArray());
		if (receivers.isEmpty()) {
			throw new IllegalArgumentException("No receivers specified");
		}
		List<Future> futures = new ArrayList<>();
		receivers.forEach(obj -> {
			futures.add(startChild(obj));
		});
		CompositeFuture.all(futures).setHandler(ar -> {
			if (ar.failed()) {
				log.error("Cannot deploy " + receivers.size() + " receivers. Shutting down ...", ar.cause());
				startFuture.fail(ar.cause());
				vertx.deploymentIDs().forEach(vertx::undeploy);
			} else {
				log.info("Deployed " + receivers.size() + " receivers - " + vertx.deploymentIDs());
				startFuture.complete();
			}
		});
	}

	private Future<String> startChild(Object jsonObject) {
		JsonObject receiver = (JsonObject) jsonObject;
		Future<String> future = Future.future();
		try {
			vertx.deployVerticle(CustomReceiver.class.getName(), new DeploymentOptions().setConfig(receiver), future.completer());
		} catch (Exception e) {
			future.fail(e);
		}
		return future;
	}
}