Introduction

In this tutorial, we will explore methods for updating information within a data set using the API. It's worth mentioning that there are two primary ways to load information into a data set:

  1. Connecting over HTTP: This method involves making HTTP requests to load data into the data set. For a comprehensive guide on connecting over HTTP, please refer to this link.

  2. Using ML Studio: The focus of this tutorial will be on this method, where we load information into a data set based on existing data. We will delve into this option in more detail.

Configuring Data Retrieval

Before we dive into the implementation, it's crucial to define the requirements for the resource from which we need to fetch information. In our case, the following requirements apply: we need to send an HTTP GET request to the service's address with an API key and handle the response. The key can be dynamically configured in the domain parameters.

To meet these requirements, we need to create a request configuration that includes a variable for sending the API key.

Navigate to the "Administration" menu and select "Request configuration". Create a new configuration. The JSON configuration will appear as follows: 

{"url": "https://api-service.com/v1/forecastPrefWeek?code=$code&key=$apiKey","methodType":"GET"}

Apply the "Public" checkbox to this configuration and save it. In this case, we don't need to send anything in the request body, so we'll leave the "Payload Template" field untouched.

Next, add a domain parameter. Click on your profile icon and select the domain name. On the right, you'll find a button for adding parameters. Add a new parameter with the name "weather_api_key" and the corresponding value.

Now, let's move on to creating an ML script for data processing. Based on existing data, we will send HTTP requests cyclically to retrieve weather information for each region, followed by processing and updating the data. Here's an example of the code.

import org.joda.time.*;

apiKey = currentDomain().getParameter('weather_api_key').getValue();
requestConfigId = 9;

weatherDs = [
  id: 'WEATHER_DATA_WEEKLY',
  col: [
    date: 'L_FORECAST_DATE',
    prefecture: 'L_PREFECTURE_CODE',
    weather: 'L_WEATHER',
    weatherCode: 'L_WEATHER_CODE',
    tempMax: 'M_TEMPERATURE_MAX',
    tempMin: 'M_TEMPERATURE_MIN',
    precipitation: 'M_PRECIPITATION',
  ],
];

siteDs = [
  id: 'CONSTRUCTION_SITES',
  col: [
    gps: 'L_GPS',
    prefectureCode: 'L_PREF_CODE',
    yumakeCode: 'L_YUMAKE_CODE',
    regionCode: 'L_REGION_CODE',
  ],
];

def hasGpsFilter = isNotNullFilter(siteDs.col.gps);
def hasPrefectureCodeFilter = isNotNullFilter(siteDs.col.prefectureCode);
def siteFilter = andFilter(hasGpsFilter, hasPrefectureCodeFilter);

def sites = readDataset(siteDs.id, -1, siteFilter, descSort('ROW_UID'));

if (sites.isEmpty()) {
  return warn('Site list does not contain prefecture codes. Skipping...');
}

def prefectures = sites.collect{ it[siteDs.col.prefectureCode] }.unique(false);

prefectures.each{ code ->
  if (!(code instanceof String)) return;
  
  def variables = [
    code: code,
   	apiKey: apiKey,
  ];
  
  def requestId = httpRequestAsync(requestConfigId, null, variables);
  serviceResponse(requestId, 500, 10000) {
    if (!responseRetrieved) {
      return warn("No response retrieved for the prefecture code ${code}");
    }
    
    if (httpStatusCode != 200) {
      warn("Status code: '${httpStatusCode}'");
      warn("Something went wrong during the weather data getting for the prefecture code: ${code}")
      return;
    }
    
    def response = jsonToMap(httpBody);
    
    if (response.errors?.size()) {
      response.errors.each{ warn("Request failed with error: ${it.message ?: 'message'}") }
      return;
    }
    
    handleResponse(response);
  }
}

void handleResponse(response) {
  response.area.each{ area ->
    def rows = [];
    def prefectureCode = area.areaCode;
    
    area.forecastDateTime.eachWithIndex{ date, idx ->
      rows.add([
        (weatherDs.col.date): new LocalDate(date.take(10)),
        (weatherDs.col.prefecture): prefectureCode,
        (weatherDs.col.weather): area.weather[idx],
        (weatherDs.col.weatherCode): area.weatherCode[idx],
        (weatherDs.col.tempMax): area.temperatureMax[idx],
        (weatherDs.col.tempMin): area.temperatureMin[idx],
        (weatherDs.col.precipitation): area.precipitation[idx],
      ]);  
    }
    
    def dateFrom = rows.collect{ it[weatherDs.col.date] }.min();
    def dateFromFilter = isGreaterOrEqualFilter(weatherDs.col.date, dateFrom);
    def prefectureFilter = isEqualFilter(weatherDs.col.prefecture, prefectureCode);
    def weatherFilter = andFilter(prefectureFilter, dateFromFilter);
    
    deleteData(weatherDs.id, weatherFilter);
    storeDataset(weatherDs.id, rows);
  }
}

void warn(String data) {
  println("[WARN] ${data}");
}

void dev(String data) {
  println("[DEV] ${data}");
}

Pay attention to how we retrieve parameters from the current domain at the beginning of the script.

apiKey = currentDomain().getParameter('weather_api_key').getValue();


It's also essential to note the parameter passing for substitution in the JSON request configuration and response processing.

// variables definition
def variables = [
    code: code,
    apiKey: apiKey,
  ];
   
def requestId = httpRequestAsync(requestConfigId, null, variables);  // passing the variables for substitution in the JSON request configuration
serviceResponse(requestId, 500, 10000) {
  if (!responseRetrieved) {
    return warn("No response retrieved for the prefecture code ${code}");
  }
  
  if (httpStatusCode != 200) {
    warn("Status code: '${httpStatusCode}'");
    warn("Something went wrong during the weather data getting for the prefecture code: ${code}")
    return;
  }
  
  def response = jsonToMap(httpBody);
  
  if (response.errors?.size()) {
    response.errors.each{ warn("Request failed with error: ${it.message ?: 'message'}") }
    return;
  }
  
  handleResponse(response);
}

Configure automatic data update

To ensure our script runs automatically on a specific schedule, we need to create a data set that will act as the trigger. To achieve the desired data refresh frequency, we'll use another data set from which we'll import data. In this data set, you can have just a single record and configure the import in such a way that you always get a random value.

To connect data sources, you can refer to the documentation here.

Once the data source is connected, you need to set up the import scheduler. Here's how you can do it:

  1. Go to the "Data set summary" page of the trigger data set and click the "Schedule import" button.
  2. Fill in all the necessary fields and confirm by clicking "Schedule." If you need to initiate multiple imports, you can create several processes by clicking the "Schedule import" button again.

An exmaple of the result:

Next, to trigger the ML project based on this data set, you need to wrap it in a pipeline. You can find instructions on creating a pipeline here.

Once the pipeline is set up, you need to add the trigger. After setting this up, you may need to refresh the page. In the pop-up window, select the data set that will act as the trigger and click the "Add" button, then close the window.

You can test the trigger's functionality by making changes to the trigger data set.

  • No labels