We deal with real time data all the time. If you look at those analytics dashboards, you can see how they perform computations and tell us what happened in the last 60 mins or may be the last 7 hours. They are dealing with terabytes of data and yet they can process all of that in real time. These insights are extremely valuable because you can take the right actions if you know what’s happening. If you have a shopping website, you need to know what happened in the last few hours so that you can boost your sales. Are there a lot of visitors from France? Can I organize a quick French themed promotion to increase my sales during peak hours? The answers to all these lies deep within your data. Spark Streaming is amazing at these things! So how do we do windowed computations in Spark? How can we process this data in real time?
A quick background
Spark Streaming provides windowed computations as one of its main features. This allows us to process data using a sliding window very efficiently. Let’s consider the following figure:
As we can see here, we keep sliding the time-window to process the data. The data that falls within the current window is operated upon to produce the right outputs. In our example, the operation is applied over the last 5 seconds of data and it slides by 2 seconds. This means that it aggregates the statistics of the last 5 seconds and it displays these results once every 2 seconds.
A real example
Things get better when we see an actual example with some code! Let’s consider an example where your website is getting visitors from different countries. Now, you want to see the counts of those countries only in the last 15 seconds because you have to do some real time analysis based on that. Also, you want to update those counts once every 6 seconds. For convenience, let’s assume that the input comes in three-letter form where “usa” is USA, “ind” is India, and “aus” is Australia. If we encounter anything else, we will call it “unknown”. Let’s see how to do this using Spark Python API:
import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext def get_countryname(line): country_name = line.strip() if country_name == 'usa': output = 'USA' elif country_name == 'ind': output = 'India' elif country_name == 'aus': output = 'Australia' else: output = 'Unknown' return (output, 1) if __name__ == "__main__": if len(sys.argv) != 3: raise IOError("Invalid usage; the correct format is:\nwindow_count.py <hostname> <port>") batch_interval = 1 # base time unit (in seconds) window_length = 15 * batch_interval frequency = 6 * batch_interval spc = SparkContext(appName="WindowCount") stc = StreamingContext(spc, batch_interval) stc.checkpoint("checkpoint") lines = stc.socketTextStream(sys.argv, int(sys.argv)) addFunc = lambda x, y: x + y invAddFunc = lambda x, y: x - y window_counts = lines.map(get_countryname).reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency) window_counts.pprint() stc.start() stc.awaitTermination()
How to run the code?
Save the above code in a file called “window_count.py”. We need a way to provide real time input to our system, so let’s set up a small data server using Netcat (a tool available on Unix-like systems). Open up the terminal and type the following:
$ nc -lk 9999
Now, let’s open up a new terminal and run our Spark Python code:
$ cd /path/to/spark-1.5.1 $ ./bin/spark-submit /path/to/window_count.py localhost 9999
Go back to the Netcat terminal and start entering the three-letter codes like “ind”, “usa”, “aus”, “xyz”, and so on. If you look in the Spark terminal, you should be able to see the counts being updated. If you observe carefully, you will see that those counts will go down to 0 if you wait for more than 15 seconds without entering any new data into the Netcat terminal. You are all set! You are now processing data in real time using windowed computations in Spark.
What happened in the code?
If you need a quick refresher on Spark Streaming, you should check out my previous blog post. As we can see in our code, we initialize the StreamingContext object “stc”. We read the input from the data server into the “lines” DStream. We then map it to the corresponding country names using the “map(get_countryname)” transformation. After this, we need to compute the counts for the last 15 seconds. For this operation, we use “reduceByKeyAndWindow”. This takes the function and uses it to reduce the list to a single value. The beauty is that it takes the window length and sliding interval as input parameters. So it automatically considers only the relevant window to do the computation.
We use “invAddFunc” to speed up the computation. We want the counts to be updated once every 6 seconds and the window length is 15 seconds, so there is obviously some time overlap here. So instead of recomputing that overlapping part, it uses “invAddFunc” to do smart computation and avoid recalculating the same thing again. We then go ahead and print the counts using “window_counts.pprint()”.