Recently I’ve had the opportunity to dig into Apache Spark, thanks to some training from Brian Bloechle from Cloudera.
What is spark?
Fast, flexible, and developer friendly, Apache Spark is the leading platform for large scale SQL, batch processing, stream processing, and machine learning.
Java, Scala, Python and R are first class citizens when its comes to consuming the various Spark API’s. I’ll cover PySpark in more detail.
Spark is an agnostic processing engine, that can target a number of cluster managers including Spark Standalone, Hadoop’s YARN, Apache Mesos and Kubernetes. In the context of Spark, some useful surrounding ecosystem to be aware of:
- Apache Spark high-performance general-purpose data processing engine.
- Apache Hadoop HDFS, MapReduce, and YARN.
- Apache Parquet a fast framework/data model/programming language agnostic columnar storage format. This is the default storage format.
- Apache Impala low latency, massively parallel SQL engine for Hadoop clusters.
- Apache Hive provides a metastore service for projecting structure onto existing (unstructured) data in HDFS, enabling SQL and JDBC. Interestingly Hive’s metastore, has formed a defacto standard for representing table schema, and is being used by other structured systems including Impala.
- Apache Hue Hadoop User Experience (HUE) is web UI for Hadoop, which includes a general purpose HDFS browser, and query editors for Hive and Impala.
- Apache Arrow an in-memory data representation preventing serialization overhead.
Data Science 101
The below O’Reilly Python Data Science Handbook is great quality, and covers off essential Python libraries such as NumPy, Pandas, Matplotlib and Scikit-Learn. The machine learning chapter decomposes and explains classical algorithms (e.g. Linear Regression, k-Means Clustering, Guassian Mixture Models).
Spark 101
Spark, given its in-memory performance and elegant programming model, has become the framework of choice when processing big data, overtaking Hadoop’s old MapReduce paradigm.
Spark Core Concepts
Fundamental to Spark is the Resilient Distributed Dataset or RDD; an abstraction that represents an immutable collection of elements partitioned across nodes in a cluster. Operations on RDDs are splittable across nodes, leading to fast and scalable parallel processing.
RDDs can be created from simple text files, SQL databases, NoSQL stores, AWS S3, and tons more. Spark Core API’s are built atop of the RDD, enabling traditional map-reduce functionality, but also providing built-in support for joining and shuffling data sets, filtering, sampling, and aggregation.
Spark runs in a distributed fashion by combining a driver core process that splits a Spark application into tasks and distributes them among many executor processes to perform the actual work. These executors can be scaled up and down as required for the application’s needs.
- Spark Structured Streaming
- Spark GraphFrames
- Apache Parquet
- A comparison of file formats and storage engines from CERN
General Spark Doco: https://spark.apache.org/docs/latest/
RDD Resilient Distributed Datasets: https://spark.apache.org/docs/latest/rdd-programming-guide.html
Data frames, important concept to grasp. Are the unit of operation, and are conceptually equivalent to a table in a relational database. Due to functional underpinning are immutable, ephemeral and lazily evaluated.
Parquet - the default and preferred binary format for representing data. Storage of data in memory should be done contiguously if possible. Two options:
- Row format: just store the complete record contiguously. Having all data, more applicable to ETL use-cases. Apache Avro is one representation for row format. Hive is more suited to row based representations.
- Columnar format: particular columns in contiguous locations. Apache Parquet for columns, which is the default for Spark. Impala is well suited to Parquet.
Possible to convert a PySpark data frame to a SciPy including Pandas.
Its possible to tweak the local param below:
spark = SparkSession.builder \
.master("local") \
.appName("connect-local") \
.getOrCreate()
Like this:
.master("local[4]") // use four core
.master("local[*]") // use all cores
Render a Pandas formatted data frame:
riders.limit(5).toPandas()
In CDSW, shift return lets you do multi-line editing.
Spark SQL Built-in Functions: https://spark.apache.org/docs/latest/api/sql/index.html
riders.createOrReplaceTempView("riders")
spark.sql("select count(id), count(distinct id) from riders").show()
MLLib = odl MLLIB = new SparkSql = old SparkSQL = new
Tip: some functions and methods work differently - example:
riders.select(“sex”).disinct().count() // returns 3
from pyspark.sql.functions import count, countDistinct riders.select(count(“id”), countDistinct(“sex”)).show() //returns 2 for gender
General advice, when working with big data, consider summarising/approximating the data, e.g. approxQuantile:
# Use the `approxQuantile` to get customized (approximate) quantiles:
riders.approxQuantile("home_lat", \
probabilities=[0.0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 1.0], \
relativeError=0.1)
General Spark Doco: https://spark.apache.org/docs/latest/
RDD Resilient Distributed Datasets https://spark.apache.org/docs/latest/rdd-programming-guide.html
Data frames, important concept to grasp. Are the unit of operation, and are conceptually equivalent to a table in a relational database. Due to functional underpinning are immutable, ephemeral and lazily evaluated.
Parquet - the default and preferred binary format for representing data. Storage of data in memory should be done contiguously if possible. Two options:
- Row format: just store the complete record contiguously. Having all data, more applicable to ETL use-cases. Apache Avro is one representation for row format. Hive is more suited to row based representations.
- Columnar format: particular columns in contiguous locations. Apache Parquet for columns, which is the default for Spark. Impala is well suited to Parquet.
Possible to convert a PySpark data frame to a SciPy including Pandas.
Its possible to tweak the local param below:
spark = SparkSession.builder \
.master("local") \
.appName("connect-local") \
.getOrCreate()
Like this:
.master("local[4]") // use four core
.master("local[*]") // use all cores
Render a Pandas formatted data frame:
riders.limit(5).toPandas()
Shift return lets you do multi-line editing.
Spark SQL Built-in Functions: https://spark.apache.org/docs/latest/api/sql/index.html
riders.createOrReplaceTempView("riders")
spark.sql("select count(id), count(distinct id) from riders").show()
Tip: some functions and methods work differently - example:
riders.select("sex").disinct().count() // returns 3
from pyspark.sql.functions import count, countDistinct
riders.select(count("id"), countDistinct("sex")).show() //returns 2 for gender
Sample Data Analysis
riders (1 big file)
id,birth_date,start_date,first_name,last_name,sex,ethnicity,student,home_block,home_lat,home_lon,work_lat,work_lon
220200000001,1962-03-18,2017-01-01,Natalie,Prosser,female,White,0,380170405002188,46.816399,-96.874038,46.831427,-96.827786
rides (1 big file)
id,driver_id,rider_id,date_time,utc_offset,service,origin_lat,origin_lon,dest_lat,dest_lon,distance,duration,cancelled,star_rating
0000000001,220200000214,220200000084,2017-02-01 00:14,-6,,46.850956,-96.902849,46.860050,-96.825442,10123,729,0,5
ride_routes (1 file per day)
Tab delim, no header
0000000001 0000000001 46.849960 -96.901848 0 0
0000000001 0000000002 46.850060 -96.901558 25 6
0000000001 0000000003 46.850090 -96.901405 37 9
ride_reviews
0000000009 Dale is extremely cordial.
0000000037 Very junky car.
0000000071 most awful stench of all time! throw away your air freshener!
0000000083 No trouble of note.
drivers (1 big file)
id,birth_date,start_date,first_name,last_name,sex,ethnicity,student,home_block,home_lat,home_lon,vehicle_make,vehicle_model,vehicle_year,vehicle_color,vehicle_grand,vehicle_noir,vehicle_elite,rides,stars
220200000007,1996-12-21,2017-01-01,Adam,Abrahamson,male,White,1,270270204001008,46.868308,-96.786160,Chevrolet,Cruze,2013,gray,0,0,0,89,398
offices (1 file)
office_id,postal_code,city,country
1,93300,Paris,France
2,13006,Marseille,France
demographics (1 big file)
block_group median_income median_age
020130001001 53125 43.4
020130001002 63917 45.3
020130001003 60227 36.0
020160001001 57500 39.6
020160002001 88750 36.7
weather (1 file)
Station_ID,Date,Max_TemperatureF,Mean_TemperatureF,Min_TemperatureF,Max_Dew_PointF,MeanDew_PointF,Min_DewpointF,Max_Humidity,Mean_Humidity,Min_Humidity,Max_Sea_Level_PressureIn,Mean_Sea_Level_PressureIn,Min_Sea_Level_PressureIn,Max_VisibilityMiles,Mean_VisibilityMiles,Min_VisibilityMiles,Max_Wind_SpeedMPH,Mean_Wind_SpeedMPH,Max_Gust_SpeedMPH,PrecipitationIn,CloudCover,Events,WindDirDegrees
KFAR,2017-01-01,27,22,17,20,14,10,85,70,46,30.17,30.04,29.71,10,9,1,17,12,,0.01,7,Snow,33
KFAR,2017-01-02,21,17,12,18,16,10,92,87,73,30.15,30.07,29.97,10,4,0,20,12,,0.20,8,Fog-Snow,26
KFAR,2017-01-03,12,2,-8,7,-2,-15,84,77,58,30.37,30.21,30.02,10,5,2,25,20,34,0.00,6,Snow,314
data_scientists (1 file)
employee_id,first_name,last_name,office_id
63,Sophia,Laurent,1
88,Thomas,Dubois,1