Lambda IV: Use the Power of Lambda
- 17 minsIn my last post in this series, we have developed all the building blogs for our data science application - we only miss the parallelisation setup.
This is part four on my series on Amazon Lambda for Data Science. To see how this all comes together, we will develop a real-world data science applications in this blog post series over the next time.
- Introduction: Why you should use Amazon Lambda
- Part 2: Simple map framework for Amazon Lambda using zappa
- Part 3: Data Science with Amazon Lambda
- Part 4: Use the Power of Lambda
You can find the code of this series on my github account (will be updated when we go on). The version of this post is stored in tag part-four.
The application is running and working as expected - but where is the benefit of using possibly 1000 parallel invocations of Amazon Lambda if we run our code sequentially on only one core? Before we go into the details, let me give you some first remarks on this subject
- In this tutorial I will assume, that your data science code can easily be parallelised for a given data set - in our case the feature calculation can run independently for different
id
s of the data set. This may not be the case in your use-case. - The benefit of parallelisation always comes with the drawback of overhead because of process management (or Lambda function management in our case) and streaming (which is even worse in our case, as we have to stream over the network to the next lambda function instead of only streaming to a different process). Do not underestimate this! We will see this effect clearly, when we do some performance studies.
- The current limit for parallel executions of Lambda functions is 1000 invocations. If we assume something like up to 10 needed Lambda invocations for a given request, we can handle 100 users in parallel before the request needs to be throttled down. This is probably enough if you are running a small data science application, but will not be sufficient in large scale applications. You can in principle call the Amazon support to increase your limit here, but as discussed in the first post in this series, it may be beneficial to use other possibilities (e.g. Amazon EC2). The good thing: the developed
flask
application runs without any problems on EC2 - you just have to change the way the parallelisation is done (e.g. use a multiprocessingPool
or packages likedistributed
).
After this is settled, lets start!
One - Two - Many
The important parts of the current version of the my_map
function looks as follows:
We will replace the map
call with the following (pseudo-like code):
We will start all Lambda functions in parallel, because their invocation will block until they are finished. We are not using an event or callback based workflow here - although it would be possible (but harder to implement).
We will start by writing the packer and unpacker functions.
Packing and Unpacking
We will need to transport the results of the calculation from and the input chunk list to the Lambda function. So we need methods for packing up arbitrary python objects into a stream of chars, that is transportable in a JSON (because this is how the Lambdas will communicate). We will make a very strong assumption here: everything we want to stream over the network will always be in the form list of dicts of native python types
. This is quite limiting on our implementation, but if you think of it, it is exactly what we need in case of data science: everything can be broken down to lists of numbers and strings.
However, we first have to make sure that our calculate_result
function really outputs a list of dictionaries to the my_map
function. We will do this by including the following code snippet right below the groupby
I am reassigning to the same variable, so you do not need to change the rest of the code, but you can of course also change this. The function will use the grouped pandas data frame (which is basically a list in the form id
and data
) and turn it into a list of dictionaries, where the data is not a pandas dataframe but a list of python base types.
We need to refine our feature_calculation
function on the other side of the pipe also to:
to use the dictionary instead of the tuple.
Now we can define our streamer functions, e.g. like this (in zappa_distributor.py
):
Notice that I have already included a flag for compression, so we can later test if it helps or not. You can also make this compression flag configurable like the value or id column name.
We can now already start putting everything together - although we still need a method to invoke another lambda function (but we can cheat and just call the function itself instead calling it in another lambda instance).
So we add three new functions to the zappa_distributor.py
:
The distribute_to_lambda
function is our replacement for the map in my_map
(and you should already replace it with this). It calls the functions to invoke another lambda on each data separately, but in parallel (using threading). The run_lambda
function will (later) invoke a new lambda and execute the map_function
on it. As this function should not see the streaming and encoded data, we wrap it with the function_in_lambda
.
You can already go and test this setup (either locally or remotely, as you like). You will notice (especially of you run it locally), that the execution time is already larger than before. This comes solely from the streaming and packing of data - although we are not streaming over network yet.
Short amendment
Before going on, I have noticed that we make our life a lot easier, if we move the call to the feature_calculation_on_chunks
into the function_in_lambda
instead of defining the map_function
as partial function (because it is not pickle-able anymore). So we throw away the
in the my_map
function and just rename the argument f
to map_function
. In the function_in_lambda
, we replace the map_function
call by
As these were quite some changes in total, I have added another git tag called part-four-middle, where you can see the status of the code so far.
Let’s invoke ourselves!
It’s time we fill in our last missing part! To make this part configurable, we add another option to run_lambda
(as well as to distribute_to_lambda
and to my_map
, to make it configurable in calculate_result
) called invoke_lambda
. With this new parameter, our run_lambda
body looks something like this:
We still have to define the send_to_other_lambda
function in aws_utils, but what else has happened?
To explain this, I have to tell you a bit in forehand, on how we will call the lambda function. We will use the zappa framework for this. We will make an Amazon Lambda API call to the same lambda function we are running in the moment, which will again call the zappa handler, that we are already using to answer out HTTP API calls (remember?). However, this time we are telling zappa not to execute our normal flask entry point as usual, but to call a special function for us, namely the function_in_lambda
. For this, we will have to give the arguments to this function also, which means we have to convert them to JSON before (Amazon communicates via JSON between Lambdas). But the map_function
is a function object - we can not easily convert it to JSON. Or? Well we can convert it to a string quite easily, because a normal python function can be described by its full name (with module etc.). And this is exactly what the get_func_task_path
from the zappa package is doing.
The counterpart in function_in_lambda
is the import_and_get_task
function, which turns a name back into a function, which we add at the beginning
But still we have no Lambda caller! Ok, I will just dump the new code for aws_utils.py
first and we discuss it afterwards:
First of all you may notice, that we are heavily relying on zappa here - because it brings (mostly) everything we already need (actually, it has everything for calling a lambda function asynchronously, but we want a synchronous invocation).
The send_to_other_lambda
itself is nearly a copy from the run function in zappa.async
, but using our own response handler for sending the Lambda: LambdaSyncResponse
. This class derived from LambdaAsyncResponse
uses an Amazon API call (with boto3
) to invoke a new lambda, giving it the parameters the zappa
handler needs, which is:
- the function to call as a string (created with the
get_func_task_path
function) - the name of the Lambda to invoke (which is our own Lambda, written into this Lambda variable)
- the arguments and keyword arguments of the function call
Everything is nicely packed using JSON and this is it. With this last part, you can now upload everything to your Amazon account (using zappa update
) and call your own little data science application using the test script in the github repository (just remember to change the URL).
Feel free to test using different data samples, different chunk sized and different setting for compression etc. You can also try to boost the performance using different streaming techniques etc.
What to do next?
After we are finished with our implementation, we can now now go on and profile/test the whole thing. I will describe in the next post, how good this scales, what can be done better and if this is fine for a data science application (or not).
Remember, you can find all code of this post in my github repo under the tag part-four.