Loading...

May 1, 2015

Groovy Goodness: Share Data in Concurrent Environment with Dataflow Variables

To work with data in a concurrent environment can be complex. Groovy includes GPars, yes we don't have to download any dependencies, to provide some models to work easily with data in a concurrent environment. In this blog post we are going to look at an example where we use dataflow variables to exchange data between concurrent tasks. In a dataflow algorithm we define certain functions or tasks that have an input and output. A task is started when the input is available for the task. So instead of defining an imperative sequence of tasks that need to be executed, we define a series of tasks that will start executing when their input is available. And the nice thing is that each of these tasks are independent and can run in parallel if needed.

The data that is shared between tasks is stored in dataflow variables. The value of a dataflow variable can only be set once, but it can be read multiple times. When a task wants to read the value, but it is not yet available, the task will wait for the value in a non-blocking way.

In the following example Groovy script we use the Dataflows class. This class provides an easy way to set multiple dataflow variables and get their values. In the script we want to get the temperature in a city in both Celcius and Fahrenheit and we are using remote web services to the data:

import groovyx.gpars.dataflow.Dataflows
import static groovyx.gpars.dataflow.Dataflow.task

// Create new Dataflows instance to hold
// dataflow variables.
final Dataflows data = new Dataflows()

// Convert temperature from Celcius to Fahrenheit.
task {
    println "Task 'convertTemperature' is waiting for dataflow variable 'cityWeather'"

    // Get dataflow variable cityWeather value from
    // Dataflows data object. The value
    // is set by findCityWeather task.
    // If the value is not set yet, wait.
    final cityWeather = data.cityWeather
    final cityTemperature = cityWeather.temperature

    println "Task 'convertTemperature' got dataflow variable 'cityWeather'"

    // Convert value with webservice at
    // www.webservicex.net.
    final params = 
        [Temperature: cityTemperature, 
         FromUnit: 'degreeCelsius', 
         ToUnit: 'degreeFahrenheit']
    final url = "http://www.webservicex.net/ConvertTemperature.asmx/ConvertTemp"
    final result = downloadData(url, params)

    // Assign converted value to dataflow variable
    // temperature in Dataflows data object.
    data.temperature = result.text()
}

// Find temperature for city.
task {
    println "Task 'findCityWeather' is waiting for dataflow variable 'searchCity'"

    // Get value for city attribute in
    // Dataflows data object. This is 
    // set in another task (startSearch) 
    // at another time.
    // If the value is not set yet, wait.
    final city = data.searchCity

    println "Task 'findCityWeather' got dataflow variable 'searchCity'"

    // Get temperature for city with 
    // webservice at api.openweathermap.org.
    final params = 
        [q: city, 
         units: 'metric', 
         mode: 'xml']
    final url = "http://api.openweathermap.org/data/2.5/find"
    final result = downloadData(url, params)
    final temperature = result.list.item.temperature.@value

    // Assign map value to cityWeather dataflow 
    // variable in Dataflows data object. 
    data.cityWeather = [city: city, temperature: temperature]
}

// Get city part from search string.
task {
    println "Task 'parseCity' is waiting for dataflow variable 'searchCity'"

    // Get value for city attribute in
    // Dataflows data object. This is 
    // set in another task (startSearch) 
    // at another time.
    // If the value is not set yet, wait.
    final city = data.searchCity
    
    println "Task 'parseCity' got dataflow variable 'searchCity'"

    final cityName = city.split(',').first()

    // Assign to dataflow variable in Dataflows object.
    data.cityName = cityName
}

final startSearch = task {
    // Use command line argument to set
    // city dataflow variable in 
    // Dataflows data object.
    // Any code that reads this value
    // was waiting, but will start now,
    // because of this assigment.
    data.searchCity = args[0]  
}

// When a variable is bound we log it. 
final printValueBound = { dataflowVar, value ->
    println "Variable '$dataflowVar' bound to '$value'" 
}
data.searchCity printValueBound.curry('searchCity')
data.cityName printValueBound.curry('cityName')
data.cityWeather printValueBound.curry('cityWeather')
data.temperature printValueBound.curry('temperature')


// Here we read the dataflow variables cityWeather and temperature
// from Dataflows data object. Notice once the value is
// is set it is not calculated again. For example cityWeather 
// will not do a remote call again, because the value is already known
// by now.
println "Main thread is waiting for dataflow variables 'cityWeather', 'temperature' and 'cityName'"
final cityInfo = 
    data.cityWeather + [tempFahrenheit: data.temperature] + [cityName: data.cityName]


println """\

Temperature in city $cityInfo.cityName (searched with $cityInfo.city):
$cityInfo.temperature Celcius
$cityInfo.tempFahrenheit Fahrenheit
"""


// Helper method to get XML response from URL
// and parse it using XmlSlurper. Returns GPathResult.
def downloadData(requestUrl, requestParams) {
    final params = requestParams.collect { it }.join('&')
    final url = "${requestUrl}?${params}"

    final response = new XmlSlurper().parseText(url.toURL().text)
    response
}

Now when we run the script we get the following output:

$ groovy citytemp.groovy Tilburg,NL
Task 'convertTemperature' is waiting for dataflow variable 'cityWeather'
Task 'parseCity' is waiting for dataflow variable 'searchCity'
Task 'findCityWeather' is waiting for dataflow variable 'searchCity'
Task 'findCityWeather' got dataflow variable 'searchCity'
Task 'parseCity' got dataflow variable 'searchCity'
Main thread is waiting for dataflow variables 'cityWeather', 'temperature' and 'cityName'
Variable 'searchCity' bound to 'Tilburg,NL'
Variable 'cityName' bound to 'Tilburg'
Task 'convertTemperature' got dataflow variable 'cityWeather'
Variable 'cityWeather' bound to '[city:Tilburg,NL, temperature:11.76]'
Variable 'temperature' bound to '53.167999999999985'

Temperature in city Tilburg (searched with Tilburg,NL):
11.76 Celcius
53.167999999999985 Fahrenheit

Notice how tasks are waiting for values and continue when they receive their input. The order of the definition of the tasks is not important, because they will wait for their input to start the real work.

Written with Groovy 2.4.3.