BellaDati IoT Collector SDK is an API allowing to provide own Receiver and Sender implementations.
In order to build valid and working Receiver or Sender distribution, you need to get Developers distribution from BellaDati.
Implementing own Receiver or Sender module
BellaDati IoT Data Collector Developers distribution allows to extend implemented core classes GenericReceiver and GenericSender.
Development process:
- Obtain BellaDati IoT Data Collector Developers distribution (contact BellaDati support team, it's not publicly available)
- Implement own Receiver or Sender implementation and owne MainVerticle
- Build project and your own JAR module
- Run built JAR and use it in your IoT Data Collector deployment
BellaDati IoT Data Collector is built using vert.x framework.
Custom Receiver
Customizing Receiver module is helpful in situations, when custom endpoint type or customized message handling is necessary.
Basic approach:
- extend class GenericReceiver
- implement MainVerticle.class
- override initCustom(future) method with own implementation
- processMessage(..) and finishProcessing(..) must be called inside message handling implementation
- or override getJsonFromBuffer(..)/getJsonFromContext(..) if only message processing is needed to be customized
- override stop() method, super.stop() must be included
- set endpoint type to CUSTOM (in file configuration or using IoT Console)
Example (custom UDP server):
package com.belladati.iot.collector.generic.receiver.verticle; import com.belladati.iot.collector.common.util.VerticleStatus; import com.belladati.iot.collector.generic.receiver.verticle.MessageProcessor.ProcessedMessage; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; import io.vertx.core.datagram.DatagramSocket; import io.vertx.core.datagram.DatagramSocketOptions; import io.vertx.core.json.JsonObject; public class CustomReceiver extends GenericReceiver { private DatagramSocket datagramSocket; /* (non-Javadoc) * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#initCustom(io.vertx.core.Future) */ @Override protected void initCustom(Future<Void> future) { JsonObject udpConfig = config().getJsonObject("customConfig"); int port = udpConfig.getInteger("port", 1234); String host = udpConfig.getString("host", "localhost"); DatagramSocketOptions options = new DatagramSocketOptions(udpConfig.getJsonObject("options", new JsonObject())); datagramSocket = vertx.createDatagramSocket(options); datagramSocket.listen(port, host, ar -> { if (ar.succeeded()) { future.complete(); log.info("Successfully listening on port " + port); ar.result().handler(packet -> { ProcessedMessage pm = processMessage(getJsonFromBuffer(packet.data())); if (pm.isError() || pm.isIgnore()) { log.error(pm.isIgnore() ? "message ignored" : pm.getError()); return; } log.info("Accepting message: " + pm.getMessage()); finishMessageProcessing(System.currentTimeMillis(), pm); }); } else { future.fail(ar.cause()); log.error("Datagram socket listen failed", ar.cause()); } }); } /* (non-Javadoc) * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#getJsonFromContext(io.vertx.ext.web.RoutingContext) */ @Override protected JsonObject getJsonFromContext(RoutingContext routingContext) { //TODO implement own message payload processing for HTTP return super.getJsonFromContext(routingContext); } /* (non-Javadoc) * @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#getJsonFromBuffer(io.vertx.core.buffer.Buffer) */ @Override protected JsonObject getJsonFromBuffer(Buffer b) { //TODO implement own message payload processing for MQTT and UDP datagrams return super.getJsonFromBuffer(b); } /* (non-Javadoc) * @see io.vertx.core.AbstractVerticle#stop() */ @Override public void stop(Future<Void> future) throws Exception { datagramSocket.close(ar -> { super.stop(future); }); } }
Custom Sender
Customizing Sender module is helpful in situations, when custom automated action is necessary.
Basic approach:
- extend GenericSender class with own implementation
- implement Action interface
- override createCustomAction() method and return own Action class
- implement MainVerticle.class
- add action type CUSTOM to the configuration
Example:
package com.belladati.iot.collector.generic.sender.verticle; import java.io.File; import java.time.Instant; import java.util.function.Function; import com.belladati.iot.collector.common.BellaDatiVerticle; import com.belladati.iot.collector.common.util.BellaDatiUtils; import com.belladati.iot.collector.generic.sender.verticle.ActionRulesChecker.ProcessedData; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonArray; public class CustomAction implements Action { private boolean isInitialized; /* (non-Javadoc) * @see com.belladati.iot.collector.generic.sender.verticle.Action#init() */ @Override public Function<Void, Future<Void>> init() { return v -> { log.info("Initializing action: " + getClass().getSimpleName()); Future<Void> future = Future.future(); //TODO implement initialisation if needed isInitialized = true; future.complete(v); return future; }; } /* (non-Javadoc) * @see com.belladati.iot.collector.generic.sender.verticle.Action#close() */ @Override public Function<Void, Future<Void>> close() { return v -> { log.info("Closing action: " + getClass().getSimpleName()); Future<Void> future = Future.future(); //TODO implement closing method if needed future.complete(v); return future; }; } /* (non-Javadoc) * @see com.belladati.iot.collector.generic.sender.verticle.Action#process(com.belladati.iot.collector.generic.sender.verticle.ActionRulesChecker.ProcessedData) */ @Override public Function<Boolean, Future<Boolean>> process(ProcessedData message); return prevResult -> { if (!prevResult) { return Future.succeededFuture(false); } JsonArray data = message.getData(); Future<Boolean> future = Future.future(); if (!data.isEmpty()) { //TODO implement own message processing method future.complete(true); } else { future.complete(false); } return future; }; return true; } /* (non-Javadoc) * @see com.belladati.iot.collector.generic.sender.verticle.Action#isContinueOnFailure() */ @Override public boolean isInitialized() { return isInitialized; } }
Implement MainVerticle.class
public class MainVerticle extends AbstractVerticle { private static final Logger log = LoggerFactory.getLogger(MainVerticle.class); @Override public void start(Future<Void> startFuture) throws Exception { JsonObject config = config(); JsonArray receivers = config.getJsonArray("receivers", new JsonArray()); if (receivers.isEmpty()) { throw new IllegalArgumentException("No receivers specified"); } List<Future> futures = new ArrayList<>(); receivers.forEach(obj -> { futures.add(startChild(obj)); }); CompositeFuture.all(futures).setHandler(ar -> { if (ar.failed()) { log.error("Cannot deploy " + receivers.size() + " receivers. Shutting down ...", ar.cause()); startFuture.fail(ar.cause()); vertx.deploymentIDs().forEach(vertx::undeploy); } else { log.info("Deployed " + receivers.size() + " receivers - " + vertx.deploymentIDs()); startFuture.complete(); } }); } private Future<String> startChild(Object jsonObject) { JsonObject receiver = (JsonObject) jsonObject; Future<String> future = Future.future(); try { vertx.deployVerticle(CustomReceiver.class.getName(), new DeploymentOptions().setConfig(receiver), future.completer()); } catch (Exception e) { future.fail(e); } return future; } }
Overview
Content Tools