BellaDati IoT Collector SDK is an API allowing to provide own Receiver and Sender implementations.
In order to build valid and working Receiver or Sender distribution, you need to get Developers distribution from BellaDati.


Implementing own Receiver or Sender module

BellaDati IoT Data Collector Developers distribution allows to extend implemented core classes GenericReceiver and GenericSender.

Development process:

  1. Obtain BellaDati IoT Data Collector Developers distribution (contact BellaDati support team, it's not publicly available)
  2. Implement own Receiver or Sender implementation and owne MainVerticle
  3. Build project and your own JAR module
  4. Run built JAR and use it in your IoT Data Collector deployment


BellaDati IoT Data Collector is built using vert.x framework.


Custom Receiver

Customizing Receiver module is helpful in situations, when custom endpoint type or customized message handling is necessary.

Basic approach:

  • extend class GenericReceiver
  • implement MainVerticle.class
  • override initCustom(future) method with own implementation
    • processMessage(..) and finishProcessing(..) must be called inside message handling implementation
  • or override getJsonFromBuffer(..)/getJsonFromContext(..) if only message processing is needed to be customized
  • override stop() method, super.stop() must be included
  • set endpoint type to CUSTOM (in file configuration or using IoT Console)

Example (custom UDP server):

package com.belladati.iot.collector.generic.receiver.verticle;

import com.belladati.iot.collector.common.util.VerticleStatus;
import com.belladati.iot.collector.generic.receiver.verticle.MessageProcessor.ProcessedMessage;

import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.json.JsonObject;

public class CustomReceiver extends GenericReceiver {
	
	private DatagramSocket datagramSocket;
	
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#initCustom(io.vertx.core.Future)
	 */
	@Override
	protected void initCustom(Future<Void> future) {
		JsonObject udpConfig = config().getJsonObject("customConfig");
		int port = udpConfig.getInteger("port", 1234);
		String host = udpConfig.getString("host", "localhost");
		DatagramSocketOptions options = new DatagramSocketOptions(udpConfig.getJsonObject("options", new JsonObject()));
		datagramSocket = vertx.createDatagramSocket(options);
		datagramSocket.listen(port, host, ar -> {
			if (ar.succeeded()) {
				future.complete();
				log.info("Successfully listening on port " + port);
				ar.result().handler(packet -> {
					ProcessedMessage pm = processMessage(getJsonFromBuffer(packet.data()));
					if (pm.isError() || pm.isIgnore()) {
						log.error(pm.isIgnore() ? "message ignored" : pm.getError());
						return;
					}
					log.info("Accepting message: " + pm.getMessage());
					finishMessageProcessing(System.currentTimeMillis(), pm);
				});
			} else {
				future.fail(ar.cause());
				log.error("Datagram socket listen failed", ar.cause());
			}
		});
	}
 
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#getJsonFromContext(io.vertx.ext.web.RoutingContext)
	 */
	@Override
	protected JsonObject getJsonFromContext(RoutingContext routingContext) {
		//TODO implement own message payload processing for HTTP
		return super.getJsonFromContext(routingContext);
	}
	
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#getJsonFromBuffer(io.vertx.core.buffer.Buffer)
	 */
	@Override
	protected JsonObject getJsonFromBuffer(Buffer b) {
		//TODO implement own message payload processing for MQTT and UDP datagrams
		return super.getJsonFromBuffer(b);
	}
	
	/* (non-Javadoc)
	 * @see io.vertx.core.AbstractVerticle#stop()
	 */
	@Override
	public void stop(Future<Void> future) throws Exception {
		datagramSocket.close(ar -> {
			super.stop(future);
		});
	}
}


Custom Sender

Customizing Sender module is helpful in situations, when custom automated action is necessary.

Basic approach:

  • extend GenericSender class with own implementation
  • implement Action interface
  • override createCustomAction() method and return own Action class
  • implement MainVerticle.class
  • add action type CUSTOM to the configuration

Example:

package com.belladati.iot.collector.generic.sender.verticle;

import java.io.File;
import java.time.Instant;
import java.util.function.Function;
import com.belladati.iot.collector.common.BellaDatiVerticle;
import com.belladati.iot.collector.common.util.BellaDatiUtils;
import com.belladati.iot.collector.generic.sender.verticle.ActionRulesChecker.ProcessedData;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;

public class CustomAction implements Action {
	private boolean isInitialized;
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#init()
	 */
	@Override
	public Function<Void, Future<Void>> init() {
		return v -> {
			log.info("Initializing action: " + getClass().getSimpleName());
			Future<Void> future = Future.future();
			//TODO implement initialisation if needed
            isInitialized = true;
			future.complete(v);
			return future;
		};
	}
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#close()
	 */
	@Override
	public Function<Void, Future<Void>> close() {
		return v -> {
			log.info("Closing action: " + getClass().getSimpleName());
			Future<Void> future = Future.future();
			//TODO implement closing method if needed
			future.complete(v);
			return future;
		};
	}
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#process(com.belladati.iot.collector.generic.sender.verticle.ActionRulesChecker.ProcessedData)
	 */
	@Override
    public Function<Boolean, Future<Boolean>> process(ProcessedData message);
        return prevResult -> {
			if (!prevResult) {
				return Future.succeededFuture(false);
			}
			JsonArray data = message.getData();
			Future<Boolean> future = Future.future();
			if (!data.isEmpty()) {
				//TODO implement own message processing method
				future.complete(true);
			} else {
				future.complete(false);
            }
			return future;
		};
		return true;
	}
	/* (non-Javadoc)
	 * @see com.belladati.iot.collector.generic.sender.verticle.Action#isContinueOnFailure()
	 */
	@Override
	public boolean isInitialized() {
		return isInitialized;
	}
}


Implement MainVerticle.class

public class MainVerticle extends AbstractVerticle {
	private static final Logger log = LoggerFactory.getLogger(MainVerticle.class);
	@Override
	public void start(Future<Void> startFuture) throws Exception {
		JsonObject config = config();
		JsonArray receivers = config.getJsonArray("receivers", new JsonArray());
		if (receivers.isEmpty()) {
			throw new IllegalArgumentException("No receivers specified");
		}
		List<Future> futures = new ArrayList<>();
		receivers.forEach(obj -> {
			futures.add(startChild(obj));
		});
		CompositeFuture.all(futures).setHandler(ar -> {
			if (ar.failed()) {
				log.error("Cannot deploy " + receivers.size() + " receivers. Shutting down ...", ar.cause());
				startFuture.fail(ar.cause());
				vertx.deploymentIDs().forEach(vertx::undeploy);
			} else {
				log.info("Deployed " + receivers.size() + " receivers - " + vertx.deploymentIDs());
				startFuture.complete();
			}
		});
	}

	private Future<String> startChild(Object jsonObject) {
		JsonObject receiver = (JsonObject) jsonObject;
		Future<String> future = Future.future();
		try {
			vertx.deployVerticle(CustomReceiver.class.getName(), new DeploymentOptions().setConfig(receiver), future.completer());
		} catch (Exception e) {
			future.fail(e);
		}
		return future;
	}
}

  • No labels