#python #apache-spark #pyspark #apache-kafka
#python #apache-spark #pyspark #apache-kafka
Вопрос:
Я хочу проверить значение позиции потока Kafka, если равное значение имеет, например, «2», затем отобразить функцию запуска A, иначе запустить функцию B
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
'bootstrap.servers': 'localhost:9092',
'group.id': 'video-group',
'fetch.message.max.bytes': '15728640',
'auto.offset.reset': 'largest'})
# Group ID is completely arbitrary
lines = kafkaStream.map(lambda x: x[1])
flag = lines.map(lambda line: line.split(",")).map(lambda v : v[0])
if flag == "2":
A = lines.map(lambda line: line.split(",")).map(lambda v: v[1])
A.pprint()
else:
lines.pprint()
Ответ №1:
flag == "2"
никогда не будет истинным, потому что это объект Spark RDD, а не единственная строка.
Кроме того, Kafka потенциально имеет непрерывный поток записей, поэтому простая проверка второго столбца этой первой записи (при условии, что вы вызвали функцию collect() ) также не сработает.
Если вы хотите проверить наличие 2 в любой строке, вам придется ее отфильтровать
lines = kafkaStream.map(lambda x: x[1])
flag = lines.map(lambda line: line.split(",")).filter(lambda columns: columns[1] == "2")
flag.pprint()
Если вы хотите просто использовать Kafka с помощью Python и проверять значения записей, вам не нужен Spark