There is a lot of data being generated in today’s digital world, so there is a high demand for real time data analytics. This data usually comes in bits and pieces from many different sources. It can come in various forms like words, images, numbers, and so on. Twitter is a good example of words being generated in real time. We also have websites where statistics like number of visitors, page views, and so on are being generated in real time. There are so much data that it is not very useful in its raw form. We need to process it and extract insights from it so that it becomes useful. This is where Spark Streaming comes into the picture! It is exceptionally good at processing real time data and it is highly scalable. It can process enormous amounts of data in real time without skipping a beat. So how exactly does Spark do it? How do we use it?
How does it work?
If you need a quick refresher on Apache Spark, you can check out my previous blog posts where I have discussed the basics. I have also described how you can quickly set up Spark on your machine and get started with its Python API. Spark Streaming is based on the core Spark API and it enables processing of real-time data streams. We can process this data using different algorithms by using actions and transformations provided by Spark. This processed data can be used to display live dashboards or maintain a real-time database. You know how people display those animated graphs based on real time data? This is how they do it!
Let’s see how Spark Streaming processes this data. It receives input data streams and then divides it into mini-batches. These mini-batches of data are then processed by the core Spark engine to generate the output in batches. Spark’s basic programming abstraction is Resilient Distributed Datasets (RDDs). To simplify it, everything is treated as an RDD (like how we define variables in other languages) and then Spark uses this data structure to distribute the computation across many machines. Spark Streaming provides something called DStream (short for “Discretized Stream”) that represents a continuous stream of data. A live stream of data is treated as a DStream, which in turn is a sequence of RDDs. These DStreams are processed by Spark to produce the outputs.
An actual example
Everything feels better if we just discuss an actual use case. Let’s consider a simple real life example and see how we can use Spark Streaming to code it up. Let’s say you are receiving a stream of 2D points and we want to keep a count of how many points fall in each quadrant. We will be getting these points from a data server listening on a TCP socket. Let’s see how to do it in Spark. The code below is well commented, so just read through it and you’ll get an idea. We will be discussing it in detail later in this blog post.
import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext # Function to map the point to the right quadrant def get_quadrant(line): # Convert the input string into a pair of numbers try: (x, y) = [float(x) for x in line.split()] except: print "Invalid input" return ('Invalid points', 1) # Map the pair of numbers to the right quadrant if x > 0 and y > 0: quadrant = 'First quadrant' elif x < 0 and y > 0: quadrant = 'Second quadrant' elif x < 0 and y < 0: quadrant = 'Third quadrant' elif x > 0 and y < 0: quadrant = 'Fourth quadrant' elif x == 0 and y != 0: quadrant = 'Lies on Y axis' elif x != 0 and y == 0: quadrant = 'Lies on X axis' else: quadrant = 'Origin' # The pair represents the quadrant and the counter increment return (quadrant, 1) if __name__ == "__main__": if len(sys.argv) != 3: raise IOError("Invalid usage; the correct format is:\nquadrant_count.py <hostname> <port>") # Initialize a SparkContext with a name spc = SparkContext(appName="QuadrantCount") # Create a StreamingContext with a batch interval of 2 seconds stc = StreamingContext(spc, 2) # Checkpointing feature stc.checkpoint("checkpoint") # Creating a DStream to connect to hostname:port (like localhost:9999) lines = stc.socketTextStream(sys.argv, int(sys.argv)) # Function that's used to update the state updateFunction = lambda new_values, running_count: sum(new_values) + (running_count or 0) # Update all the current counts of number of points in each quadrant running_counts = lines.map(get_quadrant).updateStateByKey(updateFunction) # Print the current state running_counts.pprint() # Start the computation stc.start() # Wait for the computation to terminate stc.awaitTermination()
How to run the program?
We will discuss the details of the above program shortly. For now, just save it in a file called “quadrant_count.py”. As we discussed earlier, we need to set up a simple server to get the data. Let’s set up the data server quickly using Netcat. It is a utility available in most Unix-like systems. Open the terminal and run the following command:
$ nc -lk 9999
Then, in a different terminal, navigate to your spark-1.5.1 directory and run our program using:
$ ./bin/spark-submit /path/to/quadrant_count.py localhost 9999
Make sure you provide the right path to “quadrant_count.py”. You can enter the datapoints in the Netcat terminal like this:
The output in the Spark terminal will look like this:
How does it work?
We start the program by importing “SparkContext” and “StreamingContext”. StreamingContext is the main entry point for all our data streaming operations. We create a StreamingContext object with a batch interval of 2 seconds. It means that all our quadrant counts will be updated once every 2 seconds. Using this object, we create a “DStream” that reads streaming data from a source, usually specified in “hostname:port” format, like localhost:9999. In our example, “lines” is the DStream that represents the stream of data that we receive from the server.
In this DStream, each item is a line of text that we want to process. We split the lines by space into individual strings, which are then converted to numbers. In this case, each line will be split into multiple numbers and the stream of numbers is represented as the lines DStream. Next, we want to count the number of points belonging to each quadrant. The lines DStream is further mapped to a DStream of (quadrant, 1) pairs, which is then reduced using updateStateByKey(updateFunction) to get the count of each quadrant. Once it’s done, we will print the output using running_counts.pprint() once every 2 seconds.
We use “updateStateByKey” to update all the counts using the lambda function “updateFunction”. This is actually the core concept here, so we need to understand it completely if we want to write meaningful code using Spark Streaming. Let’s look at the following line:
updateFunction = lambda new_values, running_count: sum(new_values) + (running_count or 0)
This function basically takes two inputs and computes the sum. Here, “new_values” is a list and “running_count” is an int. This function just sums up all the numbers in the list and then adds a new number to compute the overall sum. This list just has a single element in our case. The values we get will be something a list, say , for new_values indicating that the count is 1, and the running_count will be something like 4 indicating that there are already 4 points in this quadrant. So we just sum it up and return the updated count.
Start and stop
One thing to note here is that the real processing hasn’t started yet. Spark Streaming only sets up the computation it will perform when it is started only when it’s needed. This is called lazy evaluation and it is one of cornerstones of modern functional programming languages. There’s no need to evaluate anything until it’s actually needed, right? To start the processing after all the transformations have been setup, we finally call stc.start() and stc.awaitTermination(). We are done! You can now process data in real time using Spark Streaming. It is great at processing data in real time and data can come from many different sources like Kafka, Twitter, or any other streaming service. Enjoy fiddling around with it!
One thought on “Analyzing Real-time Data With Spark Streaming In Python”
Very nice article and got lot of information…..