ML Studio supports parts of your project to be executed in an asynchronous manner. This is supported by Vert.X framework which wraps a project into a verticle to support asychronous processing. Parts of the project that are desired to be run asynchronously need to be wrapped in the function async that has a callback function as the first parameter and returns Future.

Asynchronous processing needs to be enabled in the project first in project settings. When this option is not enabled, function async would still execute callback, but in synchronous matter.

The typical use-case for this is when a demaning computation needs to be executed multiple times, e.g. on a dataset. Let's demostrate this with an example that reads data from a dataset and then checks whether a number is a prime number and stores result in the dataset.

Here's the standard synchronous way without async function.

import groovy.time.TimeCategory 
import groovy.time.TimeDuration
int isPrime(bd) {
  int n = bd.intValue()
  for (d = 2 ; d < n ; d++) {
    if (n % d == 0 && n != d) {
      return d
    }
  }
  return 0
}
List dataList = readDataset(339, -1, null, null)
Date start = new Date()
dataList.each { row ->
  int d = isPrime(row['M_NUMBER'])
  if (d == 0) {    
    row.put('L_IS_PRIME', 'Yes')
  } else {    
    row.put('L_IS_PRIME', 'No')
    row.put('M_DIVIDER', d)
  }
  row.put('L_EVALUTED_AT', currentLocalDateTime())
}
Date stop = new Date()
updateDataset(339, dataList)
TimeDuration td = TimeCategory.minus( stop, start )
println td
"" 

And here with async

import groovy.time.TimeCategory 
import groovy.time.TimeDuration
int isPrime(bd) {
  int n = bd.intValue()
  for (d = 2 ; d < n ; d++) {
    if (n % d == 0 && n != d) {
      return d
    }
  }
  return 0
}
List futures = [] #1
List dataList = readDataset(339, -1, null, ascSort('ROW_UID'))
Date start = new Date()
dataList.each { r ->	
  futures.add( 
    async({row -> #2
      int d = isPrime(row['M_NUMBER'])
      if (d == 0) {        
        row.put('L_IS_PRIME', 'Yes')
      } else {     
        row.put('L_IS_PRIME', 'No')
        row.put('M_DIVIDER', d)
      }
      row.put('L_EVALUTED_AT', currentLocalDateTime())
    }, r)
  )
}
io.vertx.core.CompositeFuture.all(futures).onComplete({ e -> #3
  updateDataset(339, dataList)  
  Date stop = new Date()  
  TimeDuration td = TimeCategory.minus( stop, start )
  println td
})
"" 

Explanation of certain rows:

#1 - Since the script needs to wait for all futures to finish, we need to store the futures in a list

#2 - The first parameter of async function is a function with one parameter (function(row)) and the second parameter is the row passed to the function

#3 - Wait for all futures to finish and execute a callback then, the callback function has one parameter of type AsyncResult.


  • No labels