Как объединить действие pyspark «с колонкой» — Python 3?

#python #unit-testing #pyspark #apache-spark-sql #python-unittest

Вопрос:

Я изучаю pyspark, у меня есть функция:

 import re

def function_1(string):
    new_string = re.sub(r"!", " ", string)
    return new_string


udf_function_1 = udf(lambda s: function_1(s), StringType())


def function_2(data):
    new_data = data 
        .withColumn("column_1", udf_function_1("column_1"))
    return new_data
 

Мой вопрос в том, как написать unittest для function_2() на Python.

Ответ №1:

в чем именно вы хотите протестировать function_2 ?

Ниже приведен простой тест, сохраненный в файле под названием sample_test.py . Я использовал pytest , но вы можете использовать очень похожий код в unittest.

 # sample_test.py

from pyspark import sql


spark = sql.SparkSession.builder 
        .appName("local-spark-session") 
        .getOrCreate()
        
def test_create_session():
    assert isinstance(spark, sql.SparkSession) == True
    assert spark.sparkContext.appName == 'local-spark-session'

def test_spark_version():
    assert spark.version == '3.1.2'
 

запуск теста…

 C:UsersuserDesktop>pytest -v sample_test.py
============================================= test session starts =============================================
platform win32 -- Python 3.6.7, pytest-6.2.5, py-1.10.0, pluggy-1.0.0 -- c:usersuserappdatalocalprogramspythonpython36python.exe
cachedir: .pytest_cache
rootdir: C:UsersuserDesktop
collected 2 items

sample_test.py::test_create_session PASSED                                                               [ 50%]
sample_test.py::test_spark_version PASSED                                                                [100%]

============================================== 2 passed in 4.81s ==============================================