79244641

Date: 2024-12-02 15:34:35
Score: 0.5
Natty:
Report link

The answer of @samhita works, but it is way slower when the dataset is big.

I also included the support of list.

from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, StructType, StructField
from typing import Any

spark = SparkSession.builder.appName("replace_null_example").getOrCreate()

# Sample DataFrame
data = [
    ("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
    ("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
    ("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
    ("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
    ("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
]

# Define the schema with three levels of nested StructType fields
schema = StructType([
    StructField("a_string", StringType(), True), 
    StructField(
        "my_struct",  # First level struct
        StructType([
            StructField(
                "struct_string",  # Second level struct
                StringType(), 
                True
            ),
            StructField(
                "nested_struct",  # Third level struct
                StructType([
                    StructField("nested_field", StringType(), True) 
                ]),
                True
            )
        ]),
        True
    )
])

df = spark.createDataFrame(data, schema)

def remplace_null_values(line: Row) -> dict[str, Any]:
    line_dict = line.asDict(True)

    def transformation(data):
        if isinstance(data, dict):
            return {key: transformation(value) for key, value in data.items()}
        elif isinstance(data, list):
            return [transformation(item) for item in data]
        elif data == "null":
            return None
        else:
            return data

    line_dict = transformation(line_dict)
    return line_dict

new_df: DataFrame = df.rdd.map(remplace_null_values).toDF(df.schema)

df_astring = new_df.filter(col("a_string").isNotNull())
df_struct_string = new_df.filter(col("my_struct.nested_struct.nested_field").isNotNull())

print("My df_astring")
df_astring.show(truncate=False)
print("My df_struct_string")
df_struct_string.show(truncate=False)
Reasons:
  • Long answer (-1):
  • Has code block (-0.5):
  • User mentioned (1): @samhita
  • Self-answer (0.5):
  • Low reputation (0.5):
Posted by: jeremie bergeron