Posting my own answer as I achieved to do what I was looking for (full code).
The transformation code looks like the following:
val usersRdd = sc.parallelize(users)
val ticketsRdd = sc.parallelize(tickets)
val partitioner = new DoubleDenormalizationPartitioner(p)
// Explosion factor as explained in the EDIT of my question.
val usersExplosionFactor = (2 * Math.floor(Math.sqrt(p)) - 1).toInt
val explodedUsers = usersRdd
.flatMap { user =>
(0 until usersExplosionFactor)
.map(targetP => (user, -targetP))
}
// Below we're partitioning each RDD using our custom partitioner (see full code for implementation)
val repartitionedUsers = explodedUsers
.keyBy { case (user, targetP) => (user.id, targetP) }
.partitionBy(partitioner)
val repartitionedTickets = ticketsRdd
.keyBy(ticket => (ticket.assigneeId, ticket.requesterId))
.partitionBy(partitioner)
val denormalizedTickets = repartitionedTickets
.map(_._2)
.zipPartitions(repartitionedUsers.map(_._2._1), preservesPartitioning = true) { case (tickets, usersI) =>
// Here, thanks to the map we can denormalize the assignee and requester at the same time
val users = usersI.map(u => (u.accountId, u.id) -> u.name).toMap
tickets.map { ticket =>
(
ticket,
users.get(ticket.accountId, ticket.assigneeId),
users.get(ticket.accountId, ticket.requesterId)
)
}
}
.mapPartitions(_.map { case (ticket, assignee, requester) =>
(ticket.accountId, ticket.id, assignee, requester)
})
I tested the performance of my solution compared to Dataframe joins and RDD joins, not working so smoothly. Overall I imagine that the advice "do not use RDDs unless you really what you're doing" applies here (I don't really know what I'm doing here, first time really using the RDDs in an "advanced" way).
I hope it could still help someone or that at least someone found this problem interesting (I did).