Skip to content

Exception when connecting with kafka topic #70

@Wassim-Mohsni

Description

@Wassim-Mohsni

Hello, I'm using the Spark 2.4.1 image, I made a simple scala application that pulls data from a kafka topic and displays it in the console:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._

object StreamHandler {
  def main(args: Array[String]): Unit = {

    // initialize Spark
    val spark = SparkSession
      .builder()
      .appName("KafkaStreaming")
      .getOrCreate()

    // avoid warnings
    import spark.implicits._

    // read from Kafka
    val inputDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "sometopic") 
      .option("startingOffsets", "earliest")
      .option("partition.assignment.strategy", "range")
      .load();

    val expandedDF = inputDF.selectExpr("CAST(value AS STRING)")
   
    val query = expandedDF
      .writeStream
      .outputMode("update")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

The problem is when I run spark-submit I get this exception:

2021-07-27 19:30:19,993 INFO streaming.MicroBatchExecution: Using MicroBatchReader [KafkaV2[Subscribe[plates-raw]]] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@61532d01]
2021-07-27 19:30:20,016 INFO streaming.MicroBatchExecution: Starting new streaming query.
2021-07-27 19:30:20,030 INFO streaming.MicroBatchExecution: Stream started from {}
2021-07-27 19:30:20,172 WARN kafka010.KafkaOffsetReader: Error in attempt 1 getting Kafka offsets:
org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value.
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)

This is the spark-submit command:

docker exec -it spark-master ./bin/spark-submit\
--packages "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0\
--class StreamHandler --master spark://master:7077 --executor-memory 2G --total-executor-cores 2\
/tmp/data/streamhandler_2.11-0.1.jar

I tried the Spark 2.4.1 image and I still got the same error, this app works normally with Spark 2.2.0 image, but I need to use Spark 2.4 or above so that I can later save the topic values in Cassandra with .foreachBatch

Any help would be greatly appreciated!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions