проверка значения позиции потока Kafka

#python #apache-spark #pyspark #apache-kafka

#python #apache-spark #pyspark #apache-kafka

Вопрос:

Я хочу проверить значение позиции потока Kafka, если равное значение имеет, например, «2», затем отобразить функцию запуска A, иначе запустить функцию B

 kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'video-group',
    'fetch.message.max.bytes': '15728640',
    'auto.offset.reset': 'largest'})
# Group ID is completely arbitrary

lines = kafkaStream.map(lambda x: x[1])
 flag = lines.map(lambda line: line.split(",")).map(lambda v : v[0])

if  flag == "2":
    A = lines.map(lambda line: line.split(",")).map(lambda v: v[1])
    A.pprint()
else:
    lines.pprint()
  

Ответ №1:

flag == "2" никогда не будет истинным, потому что это объект Spark RDD, а не единственная строка.

Кроме того, Kafka потенциально имеет непрерывный поток записей, поэтому простая проверка второго столбца этой первой записи (при условии, что вы вызвали функцию collect() ) также не сработает.

Если вы хотите проверить наличие 2 в любой строке, вам придется ее отфильтровать

 lines = kafkaStream.map(lambda x: x[1])
flag = lines.map(lambda line: line.split(",")).filter(lambda columns: columns[1] == "2")
flag.pprint()
  

Если вы хотите просто использовать Kafka с помощью Python и проверять значения записей, вам не нужен Spark