Kafka Python and Google Analytics
Learn how to use Kafka Python to pull Google Analytics metrics and push them to your Kafka Topic. This will allow us to analyze this data later using Spark to give us meaningful business data.
Google Analytics is a very powerful platform for monitoring your web site’s metrics including top pages, visitors, bounce rate, etc. As more and more businesses start using Big Data processes, the need to compile as much data as possible becomes more advantageous. The more data you have available, the more options you have to analyze that data and produce some very interesting results that can help you shape your business.
This article assumes that you already have a running Kafka cluster. If you don’t, then please follow my article Kafka Tutorial for Fast Data Architecture to get a cluster up and running. You will also need to have a topic already created for publishing the Google Analytics metrics to. The aforementioned article covers this procedure as well. I created a topic called admintome-ga-pages since we will be collection Google Analytics (ga) metrics about my blog’s pages.
In this article, I will walk you through how to pull metrics data from Google Analytics for your site then take that data and push it to your Kafka Cluster. In a later article, we will cover how to take that data that is consumed in Kafka and analyze it to give us some meaningful business data.
This is just one single pipeline of data that we are going to analyze. You could include many, many more pipelines based off of the process that we will be covering, so keep this post handy as you continue your journey in Big Data.
The first thing that we need to do is enable the Google Analytics API.
The Google Analytics API
The Google Analytics API is actually many APIs that let you interact with Google Analytics. There are APIs to build applications on top of the functionality of Google Analytics all the way to real-time reporting (which honestly sounds pretty awesome and might be a future article topic). In this article, we will focus on the Core Reporting API which will give us a list of pages for the last 7 days and how many people visited that page.
Enabling the Core Reporting API
The first step we need to take is enabling the API. Open up the Service Accounts page. Select Create a project and click on Continue.
Next, click on Create Service Account.
Then click on Go to credentials.
You will see the Add credentials to your project screen. We want to select Google Analytics Reporting API for ‘Which API are you using.’ For, ‘Where will you be calling the API from’ select Other non-UI because we are going to write a Python Daemon that will use the API to pull the metrics. Select Application Data for ‘What data will you be accessing.’ Lastly, answer No for the ‘Are you planning to use this API with App Engine or Compute Engine’ since our Python application will run on our own cluster. Click Continue to move on to the next section.
Now on to where we create the service account.
Under Service account name, give your service account a meaningful name. I used kafka-intake as the name for my service account. Under the Role drop-down, select Service Account then Service Account User. Next, select JSON as the key type. Finally, click on continue. The key will be downloaded to your system. Take note of the Service Account ID field. You will need this in the next step.
Add Your Service Account to the Google Analytics Account
Go to your Google Analytics page and sign in. Go to Settings and click on User Management. Click on the blue plus sign in the upper right corner to add a new user then click on Add new users. You will see the add new user dialog:
When you are done, click on the blue Add button at the top right-hand corner. You now have the Google Analytics API enabled and you are ready to securely pull metrics from it. In the next section, we will walk through writing our Python application that will pull this data and push it to Kafka.
Kafka Python Example
Now that we got through the boring part, it is finally time to roll up our sleeves and start some development! If you just want the code then check out my GitHub repository.
We need to create a Python environment for our application by running the following commands (may differ for Mac and Windows users).
$ mkdir analytics-intake
$ cd analytics-intake/
$ virtualenv ../venvs/analytics-intake
$ source ../venvs/analytics-intake/bin/activate
This assumes that you have a venvs directory above your application directory (analytics-intake for this example). Next, we need to install some required Python modules.
$ pip install kafka-python
$ pip install --update google-api-python-client
$ pip install --upgrade oauth2client
Next, freeze your requirements to a text file.
$ pip freeze > requirements.txt
Time to write our main class and get things started.
AnalyticsIntake.py
Our first file is called AnalyticsIntake.py and is our main script. The first thing we will need is our imports:
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
from mykafka import MyKafka
import logging
from logging.config import dictConfig
import time
import os
from datetime import datetime, timedelta
The Google Analytics API Python Client needs to have the first two imports. Next, we need to define our class and our member variables:
class AnalyticsIntake(object):
SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = None
VIEW_ID = None
kafka_brokers = None
topic = None
delay = 3600
In order to call the Google Analytics API, we need several variables set. The first one is the SCOPES variable which is the API URL that our code will be calling. Next is the KEY-FILE_LOCATION variable which is a string representing the Credentials Key that was downloaded earlier when we enabled the Google Analytics API. The VIEW_ID variable can be found by using the Account Explorer for Google Analytics. The rest of the variables we will cover when we talk about the Kafka code.
Now we need to write our constructor for the AnalyticsIntake
class.
def __init__(self):
self.init_logging()
if 'KAFKA_BROKERS' in os.environ:
self.kafka_brokers = os.environ['KAFKA_BROKERS'].split(',')
self.logger.info(
"Set KAFKA_BROKERS: {}".format(self.kafka_brokers))
else:
raise ValueError('KAFKA_BROKERS environment variable not set')
if 'KEY_FILE' in os.environ:
self.KEY_FILE_LOCATION = os.environ['KEY_FILE']
self.logger.info("Set KEY_FILE: {}".format(self.KEY_FILE_LOCATION))
else:
raise ValueError('KEY_FILE environment variable not set')
if 'VIEW_ID' in os.environ:
self.VIEW_ID = os.environ['VIEW_ID']
self.logger.info("Set VIEW_ID: {}".format(self.VIEW_ID))
else:
raise ValueError('VIEW_ID environment variable not set')
if 'TOPIC' in os.environ:
self.topic = os.environ['TOPIC']
self.logger.info("Set TOPIC: {}".format(self.topic))
else:
raise ValueError('TOPIC environment variable not set')
if 'DELAY' in os.environ:
self.delay = os.environ['DELAY']
self.logger.info("Set DELAY: {} s".format(self.delay))
else:
self.delay = 3600
self.logger.info(
"DELAY environment variable not set - Setting to default {} s".format(self.delay))
Here, we are pulling the values for all these variables from environment variables. This is because we eventually want to put this code into a Docker container and we don’t want to put our key file in our Docker container. Your key file should always be kept secret. If any of these environment variables are not set, then we throw a ValueError exception and exit the application. Only one of these variables has a default and that is the delay variable. This is the number of seconds to wait between polls to Google Analytics. The default is 3600 seconds, which is equivalent to one hour.
We are going to want to use logging, so lets set up a member function to initialize logging.
def init_logging(self):
logging_config = dict(
version=1,
formatters={
'f': {'format':
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'}
},
handlers={
'h': {'class': 'logging.StreamHandler',
'formatter': 'f',
'level': logging.INFO}
},
root={
'handlers': ['h'],
'level': logging.INFO,
},
)
self.logger = logging.getLogger()
logging.getLogger("googleapiclient").setLevel(logging.ERROR)
dictConfig(logging_config)
Notice that we have a line at the end to set logging for the googleapiclient
to ERROR. This is to prevent some warning from popping up in our output logs.
Now we will create a member function to initialize our Google Analytics Reporting.
def init_reporting(self):
creds = ServiceAccountCredentials.from_json_keyfile_name(
self.KEY_FILE_LOCATION, self.SCOPES)
analytics = build('analyticsreporting', 'v4', credentials=creds)
return analytics
This creates a new ServiceAccountCredentials object that uses our KEY_FILE_LOCATION and SCOPES variables. We then use the googleapiclient.discovery object’s build class to set up our Analytics API passing in our credentials that we just created.
Next, we will write a get_reports member function that we will use to pull the metrics that we want.
def get_reports(self, analytics):
return analytics.reports().batchGet(
body={
'reportRequests': [
{
'viewId': self.VIEW_ID,
'dateRanges': [{'startDate': '7daysAgo', 'endDate': 'today'}],
'metrics': [{'expression': 'ga:sessions'}],
'dimensions': [{'name': 'ga:pageTitle'}]
}]
}
).execute()
There is a lot to this code so I will try to cover a lot of it. When you pull metrics from Google Analytics you need to specify the metrics that you want to pull. In this case, we are pulling ga:sessions metrics. The ga:sessions metric gives us the total number of sessions. With every metric you pull, you need to pull at least one dimension. In this case, we are pulling the ga:pageTitle metric. This will give us all the page titles for our website. There are a literal ton of combinations of metrics and dimensions. To make it easy Google provides the Dimensions and Metrics Explorer which will let you pick the data that you need.
Next, we will write a member function that will simplify pulling these metrics for us.
def get_page_visit_data(self):
analytics = self.init_reporting()
response = self.get_reports(analytics)
return response
This will return metrics to us in the form of JSON data. Here is a sample of what I got back:
response:{
'reports':[
{
'columnHeader':{
'dimensions':[
'ga:pageTitle'
],
'metricHeader':{
'metricHeaderEntries':[
{
'name':'ga:sessions',
'type':'INTEGER'
}
]
}
},
Finally, we need to write our main member function that will run everything for us.
def main(self):
starttime = time.time()
self.logger.info('Starting Google Analytics API Intake Daemon')
while True:
self.logger.info('Pulling site data from Google Analytics API')
response = self.get_page_visit_data()
self.logger.info(
'Got back data of type: {}'.format(type(response)))
self.logger.info(
'Successfully pulled site data from Google Analytics API')
now = datetime.now()
self.logger.info('Scheduling next run at {}'.format(
now + timedelta(seconds=self.delay)))
time.sleep(self.delay - ((time.time() - starttime) % self.delay))
if __name__ == '__main__':
intake = AnalyticsIntake()
intake.main()
This is simply an infinite loop that sleeps for the duration of self.delay (defaults to 3600 seconds) and repeats itself until you break out of the execution. Save the file and exit. The next section will cover pushing our JSON metrics data to our Kafka Cluster.
Sending Metrics to Kafka
Create a new file called mykafka.py and add the following contents.
from kafka import KafkaProducer
import json
class MyKafka(object):
def __init__(self, kafka_brokers):
self.producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=kafka_brokers
)
def send_page_data(self, json_data, topic):
result = self.producer.send(topic, json_data)
print("kafka send result: {}".format(result.get()))
In our constructor we setup our Kafka producer. We tell it that we want to send JSON data to our topic using the value_serializer
parameter. We also give it the Kafka broker list that we want to publish to. This is a list that contains the hostname and port of our brokers:
[ “mslave1.admintome.lab:31000” ]
This class only has one member function that we will utilize to publish messages to our Kafka topic. This simply uses the kafka.producer.send method to send data to our topic.
We capture the result of the send method and log out the results by using result.get()
. There wasn’t much documentation on how to do this so take note. The result turns out to be of type kafka.producer.future.FutureRecordMetadata. Which I had a hell of a time finding any Python information for. So when in doubt, go to the code. This showed a get()
member function that I could use to get the status and sure enough that is what we get:
kafka send result: RecordMetadata(topic='admintome-ga-pages', partition=0, topic_partition=TopicPartition(topic='admintome-ga-pages', partition=0), offset=18, timestamp=1530244755191, checksum=None, serialized_key_size=-1, serialized_value_size=21583)
This was the result of me pushing to my admintome-qa-pages topic in Kafka.
Save and Exit the file.
We need to update our AnalyticsIntake.py file to make use of our MyKafka
class.
Add a new member function to our AnalyticsIntake
class called publish_metrics
:
def publish_metrics(self, logger, response):
kafka_brokers = ['mslave1.admintome.lab:31000']
logger.info(
'Publishing site data to Kafka Broker {}'.format(kafka_brokers))
mykafka = MyKafka(kafka_brokers)
mykafka.send_page_data(response, self.topic)
logger.info(
'Successfully published site data to Kafka Broker {}'.format(kafka_brokers))
This simply instantiates our MyKafka
class and calls the send_page_data
member function.
Next, Update the main member function of the AnalyticsIntake
class:
def main(self):
starttime = time.time()
self.logger.info('Starting Google Analytics API Intake Daemon')
while True:
self.logger.info('Pulling site data from Google Analytics API')
response = self.get_page_visit_data()
self.logger.info(
'Got back data of type: {}'.format(type(response)))
self.logger.info(
'Successfully pulled site data from Google Analytics API')
self.publish_metrics(self.logger, response)
now = datetime.now()
self.logger.info('Scheduling next run at {}'.format(
now + timedelta(seconds=self.delay)))
time.sleep(self.delay - ((time.time() - starttime) % self.delay))
Save and Exit the file.
Running Our Kafka Python Example
Now we have everything ready to run our application except our environment variables.
$ export KAFKA_BROKERS="broker.example.com:31000"
$ export KEY_FILE="keyfile name"
$ export VIEW_ID="00000000"
$ export TOPIC="admintome-ga-pages"
Substitute those values for the actual values from the beginning of this article.
Finally, run the application:
$ python AnalyticsIntake.py
2018-06-28 23:12:07,851 root INFO Set KAFKA_BROKERS: ['mslave1.admintome.lab:31000']
2018-06-28 23:12:07,851 root INFO Set KEY_FILE: .json
2018-06-28 23:12:07,851 root INFO Set VIEW_ID:
2018-06-28 23:12:07,851 root INFO Set TOPIC: admintome-ga-pages
2018-06-28 23:12:07,851 root INFO DELAY environment variable not set - Setting to default 3600 s
2018-06-28 23:12:07,851 root INFO Starting Google Analytics API Intake Daemon
2018-06-28 23:12:07,851 root INFO Pulling site data from Google Analytics API
2018-06-28 23:12:08,900 root INFO Successfully pulled site data from Google Analytics API
2018-06-28 23:12:08,900 root INFO Publishing site data to Kafka Broker ['mslave1.admintome.lab:31000']
kafka send result: RecordMetadata(topic='admintome-ga-pages', partition=0, topic_partition=TopicPartition(topic='admintome-ga-pages', partition=0), offset=19, timestamp=1530245529016, checksum=None, serialized_key_size=-1, serialized_value_size=21583)
2018-06-28 23:12:09,030 root INFO Successfully published site data to Kafka Broker ['mslave1.admintome.lab:31000']
2018-06-28 23:12:09,030 root INFO Scheduling next run at 2018-06-29 00:12:09.030824
The application will continue to run until you hit [CTRL]-[C].
Building a Docker Container
We eventually want to deploy this application to Marathon. The best way to do this is to build a Docker container. If you clone the admintome/analytics-intake repository you will find a Dockerfile that you can use to build a Docker container and push it to your Docker repository.
Deploying the Application to Marathon
After you have built your Docker container and pushed to a Docker repository (like Docker Hub), you will need to do a Docker pull on all your Mesos slaves. Then create a new Marathon application using this JSON.
{
"id": "analytics-intake",
"cmd": null,
"cpus": 1,
"mem": 128,
"disk": 0,
"instances": 1,
"container": {
"docker": {
"image": "[docker repo username]/analytics-intake"
},
"type": "DOCKER"
},
"networks": [
{
"mode": "host"
}
],
"env": {
"KAFKA_BROKERS": "mslave1.admintome.lab:31000",
"KEY_FILE": "something.json",
"VIEW_ID": "00000000",
"TOPIC": "admintome-ga-pages"
}
}
Make sure to substitute your correct values including your Docker repo username in the docker:image section.
What’s Next?
You now have a Python application deployed to Marathon that can pull metrics from Google Analytics and push that data in the form of JSON to your Kafka cluster.
The next step is to take that raw data and analyze it into some meaningful and actionable data. We will do this using Apache Spark in a future article. Thanks for sticking with me through this article. If you enjoyed then please do me a solid and throw up a link to it.