79514721

Date: 2025-03-17 13:35:08
Score: 0.5
Natty:
Report link

This is quite an interesting problem.

In such cases where I want to implement a functionality similar to the one of a loop, I usually group all the rows together in an ordered array of arrays, and then I perform aggregation, where for every element (equivalent to row) I update the state based on some logic and save it afterwards.

I will guide you through the process. I managed to make it work for the example you sent, with one slight difference, which is actual median calculation instead of percentile_approx() function which doesn’t give us the exact median, but in a variation of this solution you can also use percentile_approx.

I’ll guide you through my solution, showing the example you used:

Algorithm:
Input: df(id, value)
Output: df(id, median_of_last_5_values)
Step 1: Combine the data frame into one row (id_array, values_array)
Step 2: Iterate through values using an aggregator that checks every new value in the list and decides whether we keep it
    array = last_array_of_accumulator
    We have two cases:
        if size(array) <= 5: try_array = array + next_element
        else: try_array = remove_first_element(array) + next_element
    We compute the median for the try_array, and:
        if median > 35: add array to accumulator
        else:  add try_array to accumulator
    // Then this returned value will be used on the next step.
Step 3: Post-processing of the result

How would it look like in your example: 
Step 1: Combining values in one row:

Combined values

Step 2:
First iteration: Value 10, accumulator=[[]]
array = []
try_array = [] + 10 = [10]
median < 35 => return [10] => accumulator = [[], [10]]
Second iteration: Value 20
array = [10]
try_array = [10] + 20 = [10,20]
Median < 35 => return [10,20] => accumulator = [[],[10],[10,20]]
…. skipping to iteration nr 7
Seventh iteration: Value 70,  accumulator = [[],[10],[10,20],[10,20,30],[10,20,30,40],[10,20,30,40,50],[10,20,30,40,50,60]]
array = [10,20,30,40,50,60]
try_array = [20,30,40,50,60] + 70 = [20,30,40,50,60,70]
Median > 35 => return [10,20,30,40,50,60] 
=> accumulator = [[],[10],[10,20],[10,20,30],[10,20,30,40],[10,20,30,40,50],[10,20,30,40,50,60],[10,20,30,40,50,60]]

Now, the implementation:

Step 1:

df = df.withColumn("dummy", lit(1)).groupBy("dummy").agg(
    collect_list("id").alias("id_list"),
    collect_list("value").alias("values_list")
)

Step 2: I used this calculation for median (in Spark SQL):

CASE WHEN size(preceding_values) % 2 == 0 then (array_sort(preceding_values)[cast(size(preceding_values)/2 as int)]  + array_sort(preceding_values)[cast(size(preceding_values)/2 as int)-1])/2
ELSE array_sort(preceding_values)[cast(size(preceding_values)/2 as int)]
END

But using this inside the aggregation generates messy code, so I would do the median calculation of an array using a UDF, such as:

def median(arr):
    if not arr:
        return None
    n = len(sorted_arr)
    if n%2 == 1:
        return float(sorted_arr[n // 2])
    else:
        return float((sorted_arr[n // 2 - 1] + sorted_arr[n // 2]) / 2)

median_udf = udf(calculate_median, FloatType())

and then only use this function directly in the calculations. The code:

aggregation_logic = expr("""
    aggregate(
        values_list,
        cast(array(array()) as array<array<int>>),
        (acc, x) -> 
            CASE 
                WHEN size(acc[size(acc) - 1]) > 5 
                THEN (
                    CASE 
                        WHEN {median_udf}(array_append(array_remove(acc[size(acc) - 1], acc[size(acc) - 1][0]), x)) > 35 
                        THEN array_append(acc, acc[size(acc) - 1])
                        ELSE array_append(acc, array(array_append(array_remove(acc[size(acc) - 1], acc[size(acc) - 1][0]), x)))
                    END
                )
                ELSE (
                    CASE 
                        WHEN {median_udf}(array_append(acc[size(acc) - 1], x)) > 35
                        THEN array_append(acc, acc[size(acc) - 1]) 
                        ELSE array_append(acc, array(array_append(acc[size(acc) - 1], x))
                    END
                )
            END
    )
""".format(median_udf=median_udf))

result_df = df.withColumn("considered_arrays", aggregation_logic)

Your result at this stage should look like this (in the example):

Arrays to be considered

Step 3:
We have 12 id's and 12 values, but 13 elements in the considered_arrays, because of the initial empty array in the accumulator. We remove that:

df = df.withColumn(“considered_arrays”, expr(“array_remove(considered_arrays, array())”))

Then to flatten the results, use the following:

df = df.select(“id_list”, “considered_arrays”)
       .withColumn(“result_struct”, expr(“arrays_zip(id_list, considered_arrays)”)
       .select(“result_struct”)

And finally, calculate medians:

result_df = df.withColumn(“id”, “result_struct.id_list”)
              .withColumn(“median”, median_udf(“result_struct.considered_arrays”))

This is just a solution with mostly Spark built-in functionality and it’s not efficient, especially in terms of huge datasets. Keep in mind, that although I’m reducing to one row, the sizes of these arrays will be huge, and the execution will be sequential. Since we have only one row, there is no parallelization among multiple workers, so scale-out won’t be much help in this case, maybe only scale-up in cases of memory issues.

If you want a more scalable solution, try implementing a similar logic completely using udf’s, or e.g. partitioning data using dummy column, and then finding a way to keep data continuity between different groups - depending on your data. The latter would be very hard, but also extremely beneficial as you would have smaller arrays to work with on one machine, and distributed execution - each worker is assigned a group.

Reasons:
  • RegEx Blacklisted phrase (1): I want
  • Long answer (-1):
  • Has code block (-0.5):
  • Low reputation (1):
Posted by: user29983593