Every business now has more data than can be handled by spreadsheets and traditional databases. So chief data officers for all digital enterprises are creating bigger data lakes at scale on Hadoop. The problem however is that the data lake journey is fraught with governance challenges down the line if not shored up by clear metadata management and governance standards from the start. It turns out that directly opening the data lake for everyone to bring their data, per the intended purpose, is a guaranteed way for the lake to turn into a swamp quickly. There are numerous ways to bring data into Hadoop. Opening and supporting all the available protocols is a governance nightmare.
Instead, we found that the best way to ensure a sustainable metadata governance standard is to design a single data manifold -- a self-service data ingestion WYSIWYG interface -- that quarantines input data and enforces a strict annotation, quality, and ownership standard for raw data. A single manifold also addresses challenges like --
There is a myriad of ways to ingest, infer, characterize, and store raw data into the data lake environment. Below, a few examples of bringing data into the lake from local files (Excel, CSV, JSON, XML), enterprise databases (SQL Server, MySQL, Postgres, Oracle, SAP BW, Teradata), cloud platforms (Google Big Query, Salesforce, Amazon S3), and from the Internet. Although we do not show the drag-n-drop user-interface, the constructs underlying the data bridge are simple and enabled easily by Spark.
You can use a myriad of inbuilt ways to ingest local data files. If the file is native to big data platform (like avro or json), you may use Spark's data connector framework directly. If the files are binary files like Excel, you may use Pandas and convert files into Spark dataframes (and consequently the Hive warehouse).
# Reading a Excel file
finance_details = pd.read_excel(
'http://go.microsoft.com/fwlink/?LinkID=521962')
# Display a preview of the file
pd_display(
finance_details,
'''Preview of the Excel file from http://go.microsoft.com/fwlink/?LinkID=521962'''
)
# Describe the dataset (data columns and their percentile stats)
pd_display(finance_details.describe(include='all'), '''Data characteristics''')
# Infer the dataset schema (data columns, types)
pd_display(
pd.DataFrame(
zip(finance_details.columns.values,
finance_details.convert_objects().dtypes.values)),
'''Data types''')
# Create a Spark (aka "big data") frame out of it
finance_df = sqlCtx.createDataFrame(finance_details)
# Display a preview of the big dataset
pd_display(
finance_df.limit(10).toPandas(),
'''Preview of the Excel file in Spark's memory''')
# Save the Spark dataframe into the data lake; overwriting data if it already exists
# Make the column names safe for Hadoop standards
finance_df.select([
F.col('{0}'.format(col)).alias(
re.sub('[^0-9a-zA-Z_]', '_', col).strip().strip('_'))
for col in finance_df.columns
]).write.parquet(
'finance_accts.parquet', mode='overwrite')
# Verify data on disk is intact
pd_display(
sqlCtx.read.parquet('finance_accts.parquet').limit(10).toPandas(),
'''Preview of the dataframe in data lake disk''')
HTML and Network files can be implemented on top of traditional JSCH like API layers. Below, we show examples of ingesting a HTML resource from the web, a SFTP resource from a file server, and file from Windows share.
# Read a HTML resource. In this case, example of most populous countries in the world from Wikipedia
# First table from https://en.wikipedia.org/wiki/List_of_countries_by_population_(United_Nations)
pd_display(
pd.read_html(
'https://en.wikipedia.org/wiki/List_of_countries_by_population_(United_Nations)',
header=0)[0],
'Most populous countries in the world from HTML page')
# Write a simple routine to read data file from home directory
# We are doing this so as to not inline username/password info here in the public notebook
def read_string(filename):
with open(filename, 'r') as f:
return f.read().strip()
# Read a file from FTP server
from os.path import expanduser
# Get home directory
home = expanduser("~/")
# Import pysftp and copy the SFTP file locally. You may be able to get a file handle too
# for big files or use spark-sftp connector
import pysftp
# Open a disposable connection
with pysftp.Connection(
'192.168.1.50',
username=read_string('{0}/.uname.txt'.format(home)),
password=read_string('{0}/.pwd.txt'.format(home))) as sftp:
with sftp.cd('/tmp'): # copy locally
sftp.get('iris.csv', 'iris.csv')
# Make a dataframe and display
pd_display(pd.read_csv('iris.csv'), "Iris spark dataset from a SFTP file")
Enterprises are increasingly moving to cloud first where data classification may be more open. ARIBA, Workday, Salesforce, AWS have become increasingly common. Your data first arrives into the cloud. In order to import these datasets into the data lake, you have a decent choice of connectors available currently.
# We will demonstrate use of Salesforce datasets to be brought into the data lake
sfdc_username = read_string('{0}/.sfdcusername.txt'.format(home))
sfdc_password = read_string('{0}/.sfdcpassword_token.txt'.format(home))
# Read the dataset
sfdc_accounts = sqlCtx.read.format("com.springml.spark.salesforce").option(
'username', sfdc_username).option('password', sfdc_password).option(
'soql', 'SELECT Id, Name, BillingCity FROM Account').option(
'version', '35.0').load()
# Show the preview
pd_display(sfdc_accounts.toPandas(),
"Salesforce accounts read into the big data lake")
Enterprises also presumably have existing databases (SQL Server, Oracle), warehouses (Teradata, Netezza), and datamarts (Cognos, Microstrategy). There is existing knowledge in these databases that needs to be transferred into the single data lake as well (logically, if not physically) in order for end to end analytical insights. Most of these data sources (including nosql databases) provide standard JDBC connectivity for Spark.
Underneath, we show an example of ingesting from a simple sqlite database which scales just as easily for bigger MPP warehouses as well like Teradata and SAP.
The JDBC path opens a world of possibilities. Btw, the Spark dataframes and consequently any big data frames can also be written back to JDBC sources making interoperability with existing databases a breeze!
# Make a JDBC connection from the local SQLite database into a Spark dataframe
jdbc_db = sqlCtx.read.jdbc(
url=r"jdbc:sqlite:{0}/chinook.db".format(os.getcwd(
)), # The source of the file
table="(select * from customers) a", # Prune dataset that you load
properties={'driver':
'org.sqlite.JDBC'}) # Driver in the classpath if needed
# Display a preview.
pd_display(jdbc_db.toPandas(
), 'Any JDBC source can be materialized in memory and constructed as a Spark dataframe'
)
So what can we do with the data in the lake? Whether that data is physically copied in the warehouse, saved to a local columnar file, or logically referenced in-memory from an existing remote source, most analytics can directly be applied in place. This makes ephemeral analytics and pipelined ETL at scale easy to implement in Spark.
Underneath, we just pick the simplest Iris flower dataset and visually explore for the separability of the features.
# Wrap the iris dataset big data frame
iris_pandas_dataset = pd.read_csv('iris.csv')
# Just to show a complex data file...
# Save the Pandas object as a JSON file
iris_pandas_dataset.to_json('iris.json', orient='records')
# Just for demonstration, read the JSON file too
iris_dataset = sqlCtx.read.json('iris.json')
# Show a preview of the data
pd_display(iris_dataset.toPandas(), "Preview of the raw Iris dataset")
# Describe the dataset
pd_display(
iris_dataset.describe(
['`{0}`'.format(colname)
for colname in iris_dataset.columns]).toPandas().T,
'Column statistics of the dataset')
In order to check the separability of the flowers, let us visually rendition the scatter plot of the data. If they are separable, we ought to easily spot the void space between the scatter groups.
%matplotlib inline
# Plot the dataset; color each species of Iris with different colors
sns.pairplot(iris_dataset.toPandas(), hue="species");
We observe that Setosa species is of course clearly separable from the other two species. The Versicolor and Virginica species of Iris are however not easily separable.
Raw files do not come prepped for analysis. You must indispensably change the raw data into an analytical shape first. You can choose to do that in the lake before downstream data marts can consume the data or directly atop the ODBC connector on top of Impala/Tez/Spark. Underneath, we show how you can implement star-schema in Spark.
Men are from Mars. Women are from Venus.¶
The movie lens data (http://grouplens.org/datasets/movielens/) provides Likert ratings of movies as entered by viewers. Based on this data, we want to identify if there are movies that have substantive difference in likings by men and women; and if there are, we want to identify the top 10 movies that have maximum disagreement between men and women.
Underneath is also an example for how can mix imperative, declarative, and functional style programming to shape the data.
# Bring the ratings, users, and movies data from the web
# Ratings
ratings = sqlCtx.createDataFrame(
pd.read_csv(
'http://files.grouplens.org/datasets/movielens/ml-100k/u.data',
sep='\t',
names=['user_id', 'item_id', 'rating', 'timestamp'],
usecols=[0, 1, 2]))
# Users
users = sqlCtx.createDataFrame(
pd.read_csv(
'http://files.grouplens.org/datasets/movielens/ml-100k/u.user',
sep='|',
names=['user_id_dim', 'age', 'gender', 'occupation', 'zip'],
usecols=[0, 2]))
# Movies
movies = sqlCtx.createDataFrame(
pd.read_csv(
'http://files.grouplens.org/datasets/movielens/ml-100k/u.item',
sep="|",
usecols=[0, 1]),
schema=['movie_id_dim', 'movie_title'])
# Display the raw dimensional tables of users and movies
pd_display(movies.toPandas(), 'Movies dimension table from MovieLens')
pd_display(users.toPandas(), 'Users dimension table from MovieLens')
# Display the factual ratings table
pd_display(ratings.toPandas(), 'Ratings fact table from MovieLens')
We can overlay the dimensional tables over factual data with simple multiway join. Let us just employ simple declarative (SQL) syntax in Spark to accomplish it. The strongest advantage of using Spark, besides inbuilt machine learning libraries, is the ability to mix and match imperative, declarative, and functional style programming in the same workflow in memory.
# Join the dimensional definitions with factual tables to obtain denormalized representation
norm_ratings = sqlCtx.sql(
'select a.*, b.*, c.* from {0} a join {1} b join {2} c on a.user_id = b.user_id_dim and a.item_id = movie_id_dim'.
format(ratings.reg(), users.reg(), movies.reg()))
# Show a preview of the star-schema table
pd_display(
norm_ratings.limit(10).toPandas(),
"Denormalized star schema movielens ratings table")
We will employ aggregations using functional programming (groupBy, agg) to compute average ratings for every movie as rated by each gender.
The grouping still only groups ratings by gender and leaves ratings on successive rows of data. In order to employ simple delta logic, it is best to place the ratings for each movie per gender in the same row in two different columns. In order to get the columns, we can pivot the grouped table.
# Let us average the rating per movie per gender
# Group by movie name and gender; compute average rating per group
avg_ratings = norm_ratings.groupBy(
['movie_title', 'gender']).agg(F.avg('rating').alias('rating'))
# Display the aggregated average ratings
pd_display(avg_ratings.toPandas(), "Average ratings by each gender per movie")
# Pivot to flatten gender averages per movie on a single line
# Group movie into a single line, but project individual gender ratings
# as separate columns in the result file.
pivot_avg_ratings = avg_ratings.groupBy('movie_title').pivot('gender').agg(
F.first('rating'))
# Display the pivoted values to show averages
pd_display(pivot_avg_ratings.toPandas(),
"Average ratings per movie, separated by gender ratings in columns")
With ratings aligned in the right shape, we can compute movies that have maximum separation of ratings between men and women. Positive diff ratings mean Men like the movies more than women. A negative diff rating means women like the movies better than men.
# Compute movies that have the maximum ratings polarity between two genders and display top 10
polar_movies = sqlCtx.sql(
'select movie_title, F as female_rating, M as male_rating, (M-F) as diff from {0} order by abs(diff) desc limit 10'.
format(pivot_avg_ratings.reg()))
# Display the movies
pd_display(polar_movies.toPandas(),
"Top 10 movies that are rated differently by men and women.")
The key messages from this notebook are --