EMR on EKS with Spark Streaming
This is a project developed in Python CDK. It includes sample data, Kafka producer simulator, and a consumer example that can be run with EMR on EC2 or EMR on EKS. Additionally, we have added few Kinesis examples for difference use cases.
The infrastructure deployment includes the following:
- A new S3 bucket to store sample data and stream job code
- An EKS cluster v1.24 in a new VPC across 2 AZs
- The Cluster has 2 default managed node groups: the OnDemand nodegroup scales from 1 to 5, SPOT instance nodegroup can scale from 1 to 30.
- It also has a Fargate profile labelled with the value
serverless
- An EMR virtual cluster in the same VPC
- The virtual cluster links to
emr
namespace - The namespace accommodates two types of Spark jobs, ie. run on managed node group or serverless job on Fargate
- All EMR on EKS configuration are done, including fine-grained access controls for pods by the AWS native solution IAM roles for service accounts
- The virtual cluster links to
- A MSK Cluster in the same VPC with 2 brokers in total. Kafka version is 2.8.1
- A Cloud9 IDE as the command line environment in the demo.
- Kafka Client tool will be installed on the Cloud9 IDE
- An EMR on EC2 cluster with managed scaling enabled.
- 1 primary and 1 core nodes with r5.xlarge.
- configured to run one Spark job at a time.
- can scale from 1 to 10 core + task nodes
- mounted EFS for checkpointing test/demo (a bootstrap action)
Spark examples - read stream from MSK
Spark consumer applications reading from Amazon MSK:
Spark examples - read stream from Kinesis
- 1. (Optional) Build a custom docker image
- 2. Run a job with kinesis-sql connector
- 3. Run a job with Spark's DStream
Deploy Infrastructure
The provisioning takes about 30 minutes to complete. Two ways to deploy:
- AWS CloudFormation template (CFN)
- AWS Cloud Development Kit (AWS CDK).
CloudFormation Deployment
Region | Launch Template |
---|---|
--------------------------- | ----------------------- |
US East (N. Virginia) |
- To launch in a different AWS Region, check out the following customization section, or use the CDK deployment option.
Customization
You can customize the solution, such as set to a different region, then generate the CFN templates in your required region:
export BUCKET_NAME_PREFIX=<my-bucket-name> # bucket where customized code will reside
export AWS_REGION=<your-region>
export SOLUTION_NAME=emr-stream-demo
export VERSION=v2.0.0 # version number for the customized code
cd data-on-eks/analytics/cdk/stream-emr-on-eks
./deployment/build-s3-dist.sh $BUCKET_NAME_PREFIX $SOLUTION_NAME $VERSION
# create the bucket where customized code will reside
aws s3 mb s3://$BUCKET_NAME_PREFIX-$AWS_REGION --region $AWS_REGION
# Upload deployment assets to the S3 bucket
aws s3 cp ./deployment/global-s3-assets/ s3://$BUCKET_NAME_PREFIX-$AWS_REGION/$SOLUTION_NAME/$VERSION/ --recursive --acl bucket-owner-full-control
aws s3 cp ./deployment/regional-s3-assets/ s3://$BUCKET_NAME_PREFIX-$AWS_REGION/$SOLUTION_NAME/$VERSION/ --recursive --acl bucket-owner-full-control
echo -e "\nIn web browser, paste the URL to launch the template: https://console.aws.amazon.com/cloudformation/home?region=$AWS_REGION#/stacks/quickcreate?stackName=emr-stream-demo&templateURL=https://$BUCKET_NAME_PREFIX-$AWS_REGION.s3.amazonaws.com/$SOLUTION_NAME/$VERSION/emr-stream-demo.template\n"
CDK Deployment
Prerequisites
Install the following tools:
- Python 3.6 +.
- Node.js 10.3.0 +
- AWS CLI. Configure the CLI by
aws configure
. - CDK toolkit
- One-off CDK bootstrap for the first time deployment.
Deploy
python3 -m venv .env
source .env/bin/activate
pip install -r requirements.txt
cdk deploy
Post-deployment
The following post-deployment.sh
is executable in Linux, not for Mac OSX. Modify the script if needed.
- Open the "Kafka Client" IDE in Cloud9 console. Create one if the Cloud9 IDE doesn't exist.
VPC prefix: 'emr-stream-demo'
Instance Type: 't3.small'
-
Turn off AWS managed temporary credentials in Cloud9:
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install --update
/usr/local/bin/aws cloud9 update-environment --environment-id $C9_PID --managed-credentials-action DISABLE
rm -vf ${HOME}/.aws/credentials
- Run the script to configure the cloud9 IDE environment:
curl https://raw.githubusercontent.com/aws-samples/stream-emr-on-eks/main/deployment/app_code/post-deployment.sh | bash
- Wait for 5 mins, then check the MSK cluster status. Make sure it is
active
before sending data to the cluster. - Launching a new terminal window in Cloud9, send the sample data to MSK:
wget https://github.com/xuite627/workshop_flink1015-1/raw/master/dataset/nycTaxiRides.gz
zcat nycTaxiRides.gz | split -l 10000 --filter="kafka_2.12-2.8.1/bin/kafka-console-producer.sh --broker-list ${MSK_SERVER} --topic taxirides ; sleep 0.2" > /dev/null
- Launching the 3rd terminal window and monitor the source MSK topic:
kafka_2.12-2.8.1/bin/kafka-console-consumer.sh \
--bootstrap-server ${MSK_SERVER} \
--topic taxirides \
--from-beginning
MSK integration
1. Submit a job with EMR on EKS
- Sample job to consume data stream in MSK
- Submit the job:
aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name msk_consumer \
--execution-role-arn $EMR_ROLE_ARN \
--release-label emr-5.33.0-latest \
--job-driver '{
"sparkSubmitJobDriver":{
"entryPoint": "s3://'$S3BUCKET'/app_code/job/msk_consumer.py",
"entryPointArguments":["'$MSK_SERVER'","s3://'$S3BUCKET'/stream/checkpoint/emreks","emreks_output"],
"sparkSubmitParameters": "--conf spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.7 --conf spark.cleaner.referenceTracking.cleanCheckpoints=true --conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=2"}}' \
--configuration-overrides '{
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.kubernetes.driver.podTemplateFile":"s3://'$S3BUCKET'/app_code/job/driver_template.yaml","spark.kubernetes.executor.podTemplateFile":"s3://'$S3BUCKET'/app_code/job/executor_template.yaml"
}
}
],
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://'${S3BUCKET}'/elasticmapreduce/emreks-log/"}}
}'
Verify the job is running:
# can see the job pod in EKS
kubectl get po -n emr
# verify in EMR console
# in Cloud9, run the consumer tool to check if any data comeing through in the target Kafka topic
kafka_2.12-2.8.1/bin/kafka-console-consumer.sh --bootstrap-server ${MSK_SERVER} --topic emreks_output --from-beginning
Cancel the long-running job (can get job id from the job submission output or in EMR console)
aws emr-containers cancel-job-run --virtual-cluster-id $VIRTUAL_CLUSTER_ID --id <YOUR_JOB_ID>
2. EMR on EKS with Fargate
Run the same job on the same EKS cluster, but with the serverless option - Fargate compute choice.
To ensure it is picked up by Fargate not by the managed nodegroup on EC2, we will tag the Spark job by a serverless
label, which has setup in a Fargate profile previously:
--conf spark.kubernetes.driver.label.type=serverless
--conf spark.kubernetes.executor.label.type=serverless
Submit the job to Fargate:
aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name msk_consumer_fg \
--execution-role-arn $EMR_ROLE_ARN \
--release-label emr-5.33.0-latest \
--job-driver '{
"sparkSubmitJobDriver":{
"entryPoint": "s3://'$S3BUCKET'/app_code/job/msk_consumer.py",
"entryPointArguments":["'$MSK_SERVER'","s3://'$S3BUCKET'/stream/checkpoint/emreksfg","emreksfg_output"],
"sparkSubmitParameters": "--conf spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.7 --conf spark.cleaner.referenceTracking.cleanCheckpoints=true --conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=2 --conf spark.kubernetes.driver.label.type=serverless --conf spark.kubernetes.executor.label.type=serverless"}}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://'${S3BUCKET}'/elasticmapreduce/emreksfg-log/"}}}'
Verify the job is running on EKS Fargate
kubectl get po -n emr
# verify in EMR console
# in Cloud9, run the consumer tool to check if any data comeing through in the target Kafka topic
kafka_2.12-2.8.1/bin/kafka-console-consumer.sh \
--bootstrap-server ${MSK_SERVER} \
--topic emreksfg_output \
--from-beginning
3. (Optional) Submit step to EMR on EC2
cluster_id=$(aws emr list-clusters --cluster-states WAITING --query 'Clusters[?Name==`emr-stream-demo`].Id' --output text)
MSK_SERVER=$(echo $MSK_SERVER | cut -d',' -f 2)
aws emr add-steps \
--cluster-id $cluster_id \
--steps Type=spark,Name=emrec2_stream,Args=[--deploy-mode,cluster,--conf,spark.cleaner.referenceTracking.cleanCheckpoints=true,--conf,spark.executor.instances=2,--conf,spark.executor.memory=2G,--conf,spark.driver.memory=2G,--conf,spark.executor.cores=2,--packages,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,s3://$S3BUCKET/app_code/job/msk_consumer.py,$MSK_SERVER,s3://$S3BUCKET/stream/checkpoint/emrec2,emrec2_output],ActionOnFailure=CONTINUE
Verify
# verify in EMR console
# in Cloud9, run the consumer tool to check if any data comeing through in the target Kafka topic
kafka_2.12-2.8.1/bin/kafka-console-consumer.sh \
--bootstrap-server ${MSK_SERVER} \
--topic emrec2_output \
--from-beginning
Kinesis integration
1. (Optional) Build custom docker image
We will create & delete a kinesis test stream on the fly via boto3, so a custom EMR on EKS docker image to include the Python library is needed. The custom docker image is not compulsory, if you don't need the boto3 and kinesis-sql connector.
Build a image based on EMR on EKS 6.5:
export AWS_REGION=$(aws configure list | grep region | awk '{print $2}')
export ACCOUNT_ID=$(aws sts get-caller-identity --output text --query Account)
export ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com
aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin 895885662937.dkr.ecr.us-west-2.amazonaws.com
docker build -t emr6.5_custom .
# create ECR repo in current account
aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $ECR_URL
aws ecr create-repository --repository-name emr6.5_custom_boto3 --image-scanning-configuration scanOnPush=true --region $AWS_REGION
# push to ECR
docker tag emr6.5_custom $ECR_URL/emr6.5_custom_boto3
docker push $ECR_URL/emr6.5_custom_boto3
2. Use kinesis-sql connector
This demo uses the com.qubole.spark/spark-sql-kinesis_2.12/1.2.0-spark_3.0
connector to interact with Kinesis.
To enable the job-level access control, ie. the IRSA feature, we have forked the kinesis-sql git repo and recompiled a new jar after upgraded the AWS java SDK. The custom docker build above will pick up the upgraded connector automatically.
- Sample job to consume data stream in Kinesis
- Submit the job:
export AWS_REGION=$(aws configure list | grep region | awk '{print $2}')
export ACCOUNT_ID=$(aws sts get-caller-identity --output text --query Account)
export ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com
aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name kinesis-demo \
--execution-role-arn $EMR_ROLE_ARN \
--release-label emr-6.5.0-latest \
--job-driver '{
"sparkSubmitJobDriver":{
"entryPoint": "s3://'$S3BUCKET'/app_code/job/qubole-kinesis.py",
"entryPointArguments":["'${AWS_REGION}'","s3://'${S3BUCKET}'/qubolecheckpoint","s3://'${S3BUCKET}'/qubole-kinesis-output"],
"sparkSubmitParameters": "--conf spark.cleaner.referenceTracking.cleanCheckpoints=true"}}' \
--configuration-overrides '{
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.kubernetes.container.image": "'${ECR_URL}'/emr6.5_custom_boto3:latest"
}
}
],
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://'${S3BUCKET}'/elasticmapreduce/kinesis-fargate-log/"}
}
}'
3. Use Spark's DStream
This demo uses the spark-streaming-kinesis-asl_2.12
library to read from Kinesis. Check out the Spark's official document. The Spark syntax is slightly different from the spark-sql-kinesis approach. It operates at RDD level.
- Sample job to consume data stream from Kinesis
- Submit the job:
export AWS_REGION=$(aws configure list | grep region | awk '{print $2}')
export ACCOUNT_ID=$(aws sts get-caller-identity --output text --query Account)
export ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com
aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name kinesis-demo \
--execution-role-arn $EMR_ROLE_ARN \
--release-label emr-6.5.0-latest \
--job-driver '{
"sparkSubmitJobDriver":{
"entryPoint": "s3://'$S3BUCKET'/app_code/job/pyspark-kinesis.py",
"entryPointArguments":["'${AWS_REGION}'","s3://'$S3BUCKET'/asloutput/"],
"sparkSubmitParameters": "--jars https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kinesis-asl_2.12/3.1.2/spark-streaming-kinesis-asl_2.12-3.1.2.jar,https://repo1.maven.org/maven2/com/amazonaws/amazon-kinesis-client/1.12.0/amazon-kinesis-client-1.12.0.jar"}}' \
--configuration-overrides '{
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.kubernetes.container.image": "'${ECR_URL}'/emr6.5_custom_boto3:latest"
}
}
],
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://'${S3BUCKET}'/elasticmapreduce/kinesis-fargate-log/"}
}
}'
Useful commands
kubectl get pod -n emr
list running Spark jobskubectl delete pod --all -n emr
delete all Spark jobskubectl logs <pod name> -n emr
check logs against a pod in the emr namespacekubectl get node --label-columns=eks.amazonaws.com/capacityType,topology.kubernetes.io/zone
check EKS compute capacity types and AZ distribution.
Clean up
Run the clean-up script with:
curl https://raw.githubusercontent.com/aws-samples/stream-emr-on-eks/main/deployment/app_code/delete_all.sh | bash
Go to the CloudFormation console, manually delete the remaining resources if needed.