Introduction
This document provides instructions on how to consume messages streamed from Mobito to a Kafka topic hosted on AWS MSK.
The example below uses Python to set up a KafkaConsumer.
The packages required are:
kafka-python — Kafka client for Python
aws-msk-iam-sasl-signer-python — IAM-based authentication for AWS MSK
Documentation:
For other languages supporting MSK IAM authentication, refer to the AWS MSK IAM client configuration guide.
Installation
1. Configure AWS Profile
Setup a local aws profile by running aws configure where you will be prompted for your access key, secret key and default region (eu-west-1). Use the keys provided by Mobito.
aws configure creates a default aws profile. If you want to create a named profile use aws configure --profile <name>. If you use a named profile u need to add the following environment variable AWS_PROFILE=<profile_name>
2. Set Up Python Environment
Create a virtual environment. You can replace env with your name of choice
python3 -m venv env
source env/bin/activate
pip install kafka-python aws-msk-iam-sasl-signer-python grpcio-tools protobuf lz4
3. Compile the provided .proto file
Use the following command with a .proto file named vehicle_data.proto. This will create the python file vehicle_data_pb2.py This file is used as import in the Python Client below.
python -m grpc_tools.protoc -I. --python_out=. vehicle_data.proto
4) Python Client
Create a python file with the following code and replace BROKER_URLS, TOPIC_NAME, USERNAME with the values provided by Mobito.
import json
import datetime
from kafka import KafkaConsumer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
from kafka.sasl.oauth import AbstractTokenProvider
from google.protobuf.json_format import MessageToDict
import vehicle_data_pb2
BROKERS_STRING="<BROKERS_URLS>"
class MSKTokenProvider(AbstractTokenProvider):
def token(self):
token, _ = MSKAuthTokenProvider.generate_auth_token('eu-west-1')
return token
tp = MSKTokenProvider()
topic = "<TOPIC_NAME>"
consumer = KafkaConsumer(
topic,
bootstrap_servers=[broker for broker in BROKERS_STRING.split(',')],
value_deserializer=lambda v: vehicle_data_pb2.VehicleData.FromString(v),
auto_offset_reset='earliest',
api_version=(3,8,0),
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=tp,
group_id='<USERNAME>', ### Change to your username
)
# At this point data may be processed at your will. You should handle the message below.
# This example just prints the timestamp of the message
for message in consumer:
time = datetime.datetime.fromtimestamp(message.timestamp / 1000.0)
print(f"Received message at {time} (Timestamp: {message.timestamp})")
Consumer Group Naming Convention
Consumer groups follow the pattern <username>*, meaning you can create any group that starts with your username. For example, with username mobito, you can use mobito-test to test the connection, and then switch to mobito as the consumer group to start consuming messages from the beginning in production.
Replace mobito with your own username.