79802364

Date: 2025-10-28 01:55:28
Score: 1.5
Natty:
Report link

Thanks — that’s a good, detailed description of a common Spark Structured Streaming issue with S3-backed checkpoints.

Let’s break it down clearly.


💥 Error Summary

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://checkpoint/state/0/7/1.delta

This means Spark’s state store checkpoint (HDFSStateStoreProvider) tried to load a Delta file (used for state updates) from your S3 checkpoint directory, but that .delta file disappeared or was never fully committed.


🧠 Why This Happens

This typically occurs because S3 is not a fully atomic file system, while Spark’s streaming state store logic assumes atomic rename and commit semantics like HDFS provides.

Common triggers:

  1. S3 eventual consistency — the file might exist but not yet visible when Spark tries to read it.

  2. Partially written or deleted checkpoint files — if an executor or the job failed mid-commit.

  3. Misconfigured committer or checkpoint file manager — the "magic committer" setup can cause issues with state store checkpoints (which aren’t output data but internal metadata).

  4. Concurrent writes to the same checkpoint folder — e.g., restarting the job without proper stop or cleanup.

  5. S3 lifecycle rules or cleanup deleting small files under checkpoint directory.


⚙️ Root Cause in This Case

You configured:

.config("spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled", "true")
.config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
.config("spark.hadoop.fs.s3a.committer.name", "magic")
.config("spark.sql.streaming.checkpointFileManagerClass", "org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager")

These are correct for streaming output to S3 — but not ideal for Spark’s internal state store, which writes lots of small .delta files very frequently.
The “magic committer” tries to do atomic renames using temporary directories, but the state store’s file layout doesn’t cooperate well with it.

So you likely had a transient failure where 1.delta was being written, and then Spark failed before it was visible or committed — leaving a missing file reference.


✅ Recommended Fixes

1. Keep checkpoint/state on HDFS or local durable storage

If possible:

.option("checkpointLocation", "hdfs:///checkpoints/myjob")

or if on EMR:

.option("checkpointLocation", "s3://mybucket/checkpoints/") 
.config("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")

💡 Best practice:
Use S3 only for output sinks, not for streaming state checkpoints.

If you must use S3, use a consistent storage layer like:


2. If S3 must be used, disable the magic committer for checkpointing

Keep the committer for your output sink, but not for checkpoint/state store.

Try:

.config("spark.sql.streaming.checkpointFileManagerClass", "org.apache.spark.sql.execution.streaming.CheckpointFileManager")
.config("spark.hadoop.fs.s3a.committer.name", "directory")

and remove:

.config("spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled", "true")

This forces Spark to write checkpoints with simpler semantics (no magic rename tricks).


3. Check for concurrent jobs / restarts

Make sure no two jobs are writing to the same checkpoint directory.
If the old job didn’t shut down gracefully (stopGracefullyOnShutdown), the state might have been mid-write.


4. Recover

If the checkpoint is already corrupted, you may need to delete the affected checkpoint folder and restart from scratch (you’ll lose streaming state, but it will recover).


5. Upgrade or Patch

There were several S3A + Structured Streaming fixes in Spark 3.5+.
If you can, upgrade to Spark 3.5.x (lots of S3 committer and state store improvements).


🔍 Quick Checklist

ActionRecommendationCheckpoint directoryUse HDFS/local if possibleMagic committerDisable for checkpointsS3 lifecycle rulesEnsure they don’t delete small filesSpark versionPrefer ≥ 3.5.0Job restartsEnsure only one writer per checkpointAfter crashClear corrupted state folder before restart


If you share your deployment environment (EMR / K8s / Dataproc / on-prem cluster) I can give you a precise config for reliable S3 checkpointing.

Would you like me to show the updated Spark session builder config with safe S3 settings for streaming checkpoints?

Reasons:
  • Blacklisted phrase (0.5): Thanks
  • Whitelisted phrase (-1): in your case
  • Long answer (-1):
  • Has code block (-0.5):
  • Ends in question mark (2):
  • Unregistered user (0.5):
  • Low reputation (1):
Posted by: Mr GPT