79456079

Date: 2025-02-21 00:03:31
Score: 0.5
Natty:
Report link

Posting my own answer as I achieved to do what I was looking for (full code).

The transformation code looks like the following:

val usersRdd = sc.parallelize(users)
val ticketsRdd = sc.parallelize(tickets)
val partitioner = new DoubleDenormalizationPartitioner(p)

// Explosion factor as explained in the EDIT of my question.
val usersExplosionFactor = (2 * Math.floor(Math.sqrt(p)) - 1).toInt

val explodedUsers = usersRdd
  .flatMap { user =>
    (0 until usersExplosionFactor)
      .map(targetP => (user, -targetP))
  }

// Below we're partitioning each RDD using our custom partitioner (see full code for implementation)

val repartitionedUsers = explodedUsers
  .keyBy { case (user, targetP) => (user.id, targetP) }
  .partitionBy(partitioner)

val repartitionedTickets = ticketsRdd
  .keyBy(ticket => (ticket.assigneeId, ticket.requesterId))
  .partitionBy(partitioner)

val denormalizedTickets = repartitionedTickets
  .map(_._2)
  .zipPartitions(repartitionedUsers.map(_._2._1), preservesPartitioning = true) { case (tickets, usersI) =>
    // Here, thanks to the map we can denormalize the assignee and requester at the same time
    val users = usersI.map(u => (u.accountId, u.id) -> u.name).toMap
    tickets.map { ticket =>
      (
        ticket,
        users.get(ticket.accountId, ticket.assigneeId),
        users.get(ticket.accountId, ticket.requesterId)
      )
    }
  }
  .mapPartitions(_.map { case (ticket, assignee, requester) =>
    (ticket.accountId, ticket.id, assignee, requester)
  })

I tested the performance of my solution compared to Dataframe joins and RDD joins, not working so smoothly. Overall I imagine that the advice "do not use RDDs unless you really what you're doing" applies here (I don't really know what I'm doing here, first time really using the RDDs in an "advanced" way).

I hope it could still help someone or that at least someone found this problem interesting (I did).

Reasons:
  • Blacklisted phrase (0.5): thanks
  • Long answer (-1):
  • Has code block (-0.5):
  • Self-answer (0.5):
  • Low reputation (1):
Posted by: Florent