Capture data in realtime and draw dashboards for you cannot improve what you cannot measure. When business shows a trend -- up or down -- it is imperative for you to stay ahead of the realtime intelligence and take proactive action versus reacting after the fact.
Underneath we show a simple web analytics module that measures --
Ideally in real world, you want to open a syslog daemon directly on top of the Spark Streaming context and receive data in near realtime. The idea is to aggregate, visualize, monitor, and alert any non-normative behavior. There are existing tools like ELK, SILK, Druid, and Splunk that already provide this intelligence out of the box, but this notebook is intended to be educational.
# To generate the socket stream, use the following code
from pyspark.streaming import StreamingContext
# Create the streamingcontext
# ssc = StreamingContext(sc)
# Create a DStream that will connect to hostname:port, like localhost:9999
# log_lines = ssc.socketTextStream("localhost", 9998)
# Start the socket monitor
# ssc.start() # Uncomment if necessary for realtime data
# Run perpetually updating the dashboards constantly.
# ssc.awaitTermination() #Uncomment if necessary for realtime data
Since we are running in a notebook context, let us just demonstrate the log processing with local log files. We are using log file sample downloaded from https://github.com/elastic/examples/tree/master/ElasticStack_apache
# Read the log file
log_lines = sc.textFile('apache_logs.txt')
# Review the format
log_lines.take(1)
import apache_log_parser
# Make a parser to parse out the web line
line_parser = apache_log_parser.Parser("%h %l %u %t \"%r\" %s %b \"%{Referer}i\" \"%{User-Agent}i\"")
# Parse and return interesting bits
def parse_line(logline):
# Parse and return the dict as a Row of DataFrame
row_dict = {}
try:
row_dict = { k:v for (k,v) in line_parser.parse(logline).iteritems() if 'datetimeobj' not in k}
except:
pass
return row_dict
# For each log line, parse and show the parse result
import pprint
pprint.pprint(sc.parallelize(log_lines.take(1)).map(parse_line).filter(lambda x: len(x) > 5).collect()[0])
# Show a preview of the parse in a table form
# Collect only hits from mobile devices
log_lines_df = log_lines.map(parse_line).filter(lambda x: len(x) > 5).map(lambda x: Row(**x)).toDF(sampleRatio=0.5)
# Display the data
display(log_lines_df.toPandas())
Let us measure which countries and what browsers are most traffic originating from.
# Define a IP to country map
import pygeoip
def to_country(ip):
geo_ip = pygeoip.GeoIP('GeoIP.dat')
return geo_ip.country_code_by_addr(ip)
# Convert to a local frame; only selective columns
# Map the IP to a country from where the client is browsing
browser_frame = log_lines_df.select(
'request_header_user_agent__browser__family', 'remote_host',
'time_received_isoformat').withColumn(
'country', F.udf(to_country)(F.col('remote_host'))).toPandas()
# Display the preview
pd_display(browser_frame, 'Timed traffic to the website')
Normalize to a local time
%matplotlib inline
from datetime import datetime
# The current browser time is in UTC time zone. Let us normalize that to current time
offset = datetime.now() - datetime.utcnow() # Find offset first
# project the ISO time column to a local time column
browser_frame['local_time'] = browser_frame['time_received_isoformat'].apply(
lambda x: pd.to_datetime(x) + pd.to_timedelta(offset, unit='s'))
# Offset the recorded time to local time zone and reindex the series
# Make the time axis labels pretty
browser_frame.set_index(pd.DatetimeIndex(browser_frame.local_time), inplace=True)
pd_display(browser_frame, 'Timed index normalized to local timezone')
Cumulate count of browsers, countries-of-origin on a 4 hour basis and plot
# Pivot first so all status codes become columns
# Similarly pivot by countries too
def group_count(df, feature, resample='4H'):
return df.groupby([pd.TimeGrouper(resample), feature])[feature].count().unstack(feature).fillna(0)
# Create a shortcut macro
counts = lambda feature: group_count(browser_frame, feature)
# Compute and show a count of browsers
browsers = counts('request_header_user_agent__browser__family')
# Show
pd_display(browsers, "Browser counts along time")
# Compute and show countries of origin by the hour
countries = counts('country')
# Show
pd_display(countries, "Country counts along time")
# Plot the time series chart
# We will add counts in integer form, so project everything as int
chart_data = browsers[['IE', 'Chrome', 'Safari', 'Firefox', 'Opera', 'Android']].astype(int)
# Make the time axis labels pretty
from matplotlib.dates import IndexDateFormatter, DateFormatter
from datetime import datetime
fmt = DateFormatter('%m-%d %H:%M, %a')
chart_data['Time'] = chart_data.index
# Format X axis in a friendly format
from matplotlib.dates import IndexDateFormatter, DateFormatter
fmt = DateFormatter('%m-%d %H:%M, %a')
chart_data['Time'] = chart_data['Time'].apply(lambda x: fmt.strftime(x))
chart_data.set_index('Time', inplace=True)
# ============================
# Create a figure
fig, ax = plt.subplots(2, 1, figsize=(20,10))
sns.set_style("whitegrid")
# Plot the bar chart
chart_data.plot(kind='bar', ax=ax[0])
# Plot the heatmap on the same axis
sns.heatmap(chart_data.T, annot=True, fmt="d", ax=ax[1], cbar=False)
ax[1].set(xlabel='Time', ylabel='Browser')
fig.autofmt_xdate()
Let us also plot a geomap of where the client traffic is emanating from. Presumably, it varies over time. Since we have a moving window, let us use a interactive wizard for user to select a specific window of time when this responsive heatmap is desired.
# First let us geocode the country centers
from geopy.geocoders import Nominatim
geo_locator = Nominatim()
def get_latlng(ctry):
result = geo_locator.geocode(ctry)
(lat, lon) = (result[1][0], result[1][1]) if result else (0, 0)
return (lat, lon)
# Make a dataframe with country short codes and lat/long codes
countries_data = pd.DataFrame(countries.columns.values, columns=['countries'])
countries_data['latlng'] = countries_data.countries.apply(lambda ctry: get_latlng(ctry))
countries_data['latitudes'] = countries_data['latlng'].apply(lambda x: float(x[0]))
countries_data['longitudes'] = countries_data['latlng'].apply(lambda x: float(x[1]))
# Save this data for future time-lapsed heatmap plots
countries_data.set_index('countries', inplace=True)
# To make the geo map interactive so users can select a time, create a interactive drop-down wizard
# Make a map
import folium
import folium.plugins as plugins
# The following widgets allow for interactively progressing a time-lapse window and studying the heatmap
from IPython.html.widgets import interact
from IPython.html import widgets
from folium.plugins import HeatMap
# At a give chosen time, plot a heatmap of countries where clients are
def plot_traffic(Time):
x = list(countries.index.values)[Time]
# First print the time back so users have better since of the observation window
display(
HTML("<h4>Heatmap at {0}</h4>".format(
fmt.strftime(pd.to_datetime(str(x))))))
# Select frequencies at the user-chosen time and join with lat-long intelligence
ctry = countries[countries.index == x].T.join(countries_data)
ctry.rename(columns={ctry.columns[0]: 'timeval'}, inplace=True)
#Now we have lat, long, and freq of country information for the hour in a singular row
ctry = ctry[ctry[ctry.columns[0]] > 0]
# Make a brand new map. I wish we could re use the prior map. TODO
countries_map = folium.Map(location=[
countries_data['latitudes'].mean(), countries_data['longitudes'].mean()
], zoom_start=2) # Always zoom out to the world
# Make a group layer
feature_group = folium.FeatureGroup(name=str(x))
# Make a heatmap
heatmap = folium.plugins.HeatMap(
ctry[['latitudes', 'longitudes', 'timeval']].values, radius=15)
# Add the heatmap to the world
feature_group.add_children(heatmap)
countries_map.add_children(feature_group)
# return the map to paint on the canvas
return countries_map
# Display an interactive slider for user to choose a time when the heatmap is to be visualized
# Place the slider to the middle of the ibservation window
display(
interact(
plot_traffic,
Time=widgets.IntSlider(
value=len(countries.index.values)/2, min=0, max=len(countries.index.values))));
Though this is a simple BI dashboard, it goes to demonstrate how easy it is to build operational intelligence into your business systems so you are not caught off-guard when business metrics start to spike-up or trend-down. While this isn't preemptive intelligence, it surely paves way for proactive intelligence.