Skip to main content

Consuming Data via Kafka Streaming

Updated over a week ago

Introduction

This document provides instructions on how to consume messages streamed from Mobito to a Kafka topic hosted on AWS MSK.

The example below uses Python to set up a KafkaConsumer.

The packages required are:

Documentation:

For other languages supporting MSK IAM authentication, refer to the AWS MSK IAM client configuration guide.


Installation

1. Configure AWS Profile

Setup a local aws profile by running aws configure where you will be prompted for your access key, secret key and default region (eu-west-1). Use the keys provided by Mobito.

aws configure creates a default aws profile. If you want to create a named profile use aws configure --profile <name>. If you use a named profile u need to add the following environment variable AWS_PROFILE=<profile_name>

2. Set Up Python Environment

Create a virtual environment. You can replace env with your name of choice

python3 -m venv env
source env/bin/activate
pip install kafka-python aws-msk-iam-sasl-signer-python grpcio-tools protobuf lz4

3. Compile the provided .proto file

Use the following command with a .proto file named vehicle_data.proto. This will create the python file vehicle_data_pb2.py This file is used as import in the Python Client below.

python -m grpc_tools.protoc -I. --python_out=. vehicle_data.proto

4) Python Client

Create a python file with the following code and replace BROKER_URLS, TOPIC_NAME, USERNAME with the values provided by Mobito.

import json
import datetime
from kafka import KafkaConsumer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
from kafka.sasl.oauth import AbstractTokenProvider
from google.protobuf.json_format import MessageToDict
import vehicle_data_pb2

BROKERS_STRING="<BROKERS_URLS>"

class MSKTokenProvider(AbstractTokenProvider):
def token(self):
token, _ = MSKAuthTokenProvider.generate_auth_token('eu-west-1')
return token

tp = MSKTokenProvider()


topic = "<TOPIC_NAME>"

consumer = KafkaConsumer(
topic,
bootstrap_servers=[broker for broker in BROKERS_STRING.split(',')],
value_deserializer=lambda v: vehicle_data_pb2.VehicleData.FromString(v),
auto_offset_reset='earliest',
api_version=(3,8,0),
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=tp,
group_id='<USERNAME>', ### Change to your username
)

# At this point data may be processed at your will. You should handle the message below.
# This example just prints the timestamp of the message
for message in consumer:
time = datetime.datetime.fromtimestamp(message.timestamp / 1000.0)
print(f"Received message at {time} (Timestamp: {message.timestamp})")


Consumer Group Naming Convention

Consumer groups follow the pattern <username>*, meaning you can create any group that starts with your username. For example, with username mobito, you can use mobito-test to test the connection, and then switch to mobito as the consumer group to start consuming messages from the beginning in production.

Replace mobito with your own username.


Did this answer your question?