Info |
---|
BellaDati IoT Collector SDK is an API allowing to provide own Receiver and Sender implementations. |
Warning |
---|
In order to build valid and working Receiver or Sender distribution, you need to get Developers distribution from BellaDati. |
Implementing own Receiver or Sender moduleBellaDati IoT Data Collector Developers distribution allows to extend implemented core classes GenericReceiver and GenericSender. Development process: - Obtain BellaDati IoT Data Collector Developers distribution (contact BellaDati support team, it's not publicly available)
- Implement own Receiver or Sender implementation and owne MainVerticle
- Build project and your own JAR module
- Run built JAR and use it in your IoT Data Collector deployment
Tip |
---|
BellaDati IoT Data Collector is built using vert.x framework. |
Custom Receiver
Customizing Receiver module is helpful in situations, when custom endpoint type or customized message handling is necessary. Basic approach: - extend class GenericReceiver
- implement MainVerticle.class
- override initCustom(future) method with own implementation
- processMessage(..) and finishProcessing(..) must be called inside message handling implementation
- or override getJsonFromBuffer(..)/getJsonFromContext(..) if only message processing is needed to be customized
- override stop() method, super.stop() must be included
- set endpoint type to CUSTOM (in file configuration or using IoT Console)
Example (custom UDP server): Code Block |
---|
language | java |
---|
linenumbers | true |
---|
| package com.belladati.iot.collector.generic.receiver.verticle;
import com.belladati.iot.collector.common.util.VerticleStatus;
import com.belladati.iot.collector.generic.receiver.verticle.MessageProcessor.ProcessedMessage;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.json.JsonObject;
public class CustomReceiver extends GenericReceiver {
private DatagramSocket datagramSocket;
/* (non-Javadoc)
* @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#initCustom(io.vertx.core.Future)
*/
@Override
protected void initCustom(Future<Void> future) {
JsonObject udpConfig = config().getJsonObject("customConfig");
int port = udpConfig.getInteger("port", 1234);
String host = udpConfig.getString("host", "localhost");
DatagramSocketOptions options = new DatagramSocketOptions(udpConfig.getJsonObject("options", new JsonObject()));
datagramSocket = vertx.createDatagramSocket(options);
datagramSocket.listen(port, host, ar -> {
if (ar.succeeded()) {
future.complete();
log.info("Successfully listening on port " + port);
ar.result().handler(packet -> {
ProcessedMessage pm = processMessage(getJsonFromBuffer(packet.data()));
if (pm.isError() || pm.isIgnore()) {
log.error(pm.isIgnore() ? "message ignored" : pm.getError());
return;
}
log.info("Accepting message: " + pm.getMessage());
finishMessageProcessing(System.currentTimeMillis(), pm);
});
} else {
future.fail(ar.cause());
log.error("Datagram socket listen failed", ar.cause());
}
});
}
/* (non-Javadoc)
* @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#getJsonFromContext(io.vertx.ext.web.RoutingContext)
*/
@Override
protected JsonObject getJsonFromContext(RoutingContext routingContext) {
//TODO implement own message payload processing for HTTP
return super.getJsonFromContext(routingContext);
}
/* (non-Javadoc)
* @see com.belladati.iot.collector.generic.receiver.verticle.GenericReceiver#getJsonFromBuffer(io.vertx.core.buffer.Buffer)
*/
@Override
protected JsonObject getJsonFromBuffer(Buffer b) {
//TODO implement own message payload processing for MQTT and UDP datagrams
return super.getJsonFromBuffer(b);
}
/* (non-Javadoc)
* @see io.vertx.core.AbstractVerticle#stop()
*/
@Override
public void stop(Future<Void> future) throws Exception {
datagramSocket.close(ar -> {
super.stop(future);
});
}
}
|
Custom SenderCustomizing Sender module is helpful in situations, when custom automated action is necessary. Basic approach: - extend GenericSender class with own implementation
- implement Action interface
- override createCustomAction() method and return own Action class
- implement MainVerticle.class
- add action type CUSTOM to the configuration
Example: Code Block |
---|
language | java |
---|
linenumbers | true |
---|
| package com.belladati.iot.collector.generic.sender.verticle;
import java.io.File;
import java.time.Instant;
import java.util.function.Function;
import com.belladati.iot.collector.common.BellaDatiVerticle;
import com.belladati.iot.collector.common.util.BellaDatiUtils;
import com.belladati.iot.collector.generic.sender.verticle.ActionRulesChecker.ProcessedData;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
public class CustomAction implements Action {
private boolean isInitialized;
/* (non-Javadoc)
* @see com.belladati.iot.collector.generic.sender.verticle.Action#init()
*/
@Override
public Function<Void, Future<Void>> init() {
return v -> {
log.info("Initializing action: " + getClass().getSimpleName());
Future<Void> future = Future.future();
//TODO implement initialisation if needed
isInitialized = true;
future.complete(v);
return future;
};
}
/* (non-Javadoc)
* @see com.belladati.iot.collector.generic.sender.verticle.Action#close()
*/
@Override
public Function<Void, Future<Void>> close() {
return v -> {
log.info("Closing action: " + getClass().getSimpleName());
Future<Void> future = Future.future();
//TODO implement closing method if needed
future.complete(v);
return future;
};
}
/* (non-Javadoc)
* @see com.belladati.iot.collector.generic.sender.verticle.Action#process(com.belladati.iot.collector.generic.sender.verticle.ActionRulesChecker.ProcessedData)
*/
@Override
public Function<Boolean, Future<Boolean>> process(ProcessedData message);
return prevResult -> {
if (!prevResult) {
return Future.succeededFuture(false);
}
JsonArray data = message.getData();
Future<Boolean> future = Future.future();
if (!data.isEmpty()) {
//TODO implement own message processing method
future.complete(true);
} else {
future.complete(false);
}
return future;
};
return true;
}
/* (non-Javadoc)
* @see com.belladati.iot.collector.generic.sender.verticle.Action#isContinueOnFailure()
*/
@Override
public boolean isInitialized() {
return isInitialized;
}
}
|
Implement MainVerticle.class Code Block |
---|
language | java |
---|
linenumbers | true |
---|
| public class MainVerticle extends AbstractVerticle {
private static final Logger log = LoggerFactory.getLogger(MainVerticle.class);
@Override
public void start(Future<Void> startFuture) throws Exception {
JsonObject config = config();
JsonArray receivers = config.getJsonArray("receivers", new JsonArray());
if (receivers.isEmpty()) {
throw new IllegalArgumentException("No receivers specified");
}
List<Future> futures = new ArrayList<>();
receivers.forEach(obj -> {
futures.add(startChild(obj));
});
CompositeFuture.all(futures).setHandler(ar -> {
if (ar.failed()) {
log.error("Cannot deploy " + receivers.size() + " receivers. Shutting down ...", ar.cause());
startFuture.fail(ar.cause());
vertx.deploymentIDs().forEach(vertx::undeploy);
} else {
log.info("Deployed " + receivers.size() + " receivers - " + vertx.deploymentIDs());
startFuture.complete();
}
});
}
private Future<String> startChild(Object jsonObject) {
JsonObject receiver = (JsonObject) jsonObject;
Future<String> future = Future.future();
try {
vertx.deployVerticle(CustomReceiver.class.getName(), new DeploymentOptions().setConfig(receiver), future.completer());
} catch (Exception e) {
future.fail(e);
}
return future;
}
}
|
|