Thanks — that’s a good, detailed description of a common Spark Structured Streaming issue with S3-backed checkpoints.
Let’s break it down clearly.
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://checkpoint/state/0/7/1.delta
This means Spark’s state store checkpoint (HDFSStateStoreProvider) tried to load a Delta file (used for state updates) from your S3 checkpoint directory, but that .delta file disappeared or was never fully committed.
This typically occurs because S3 is not a fully atomic file system, while Spark’s streaming state store logic assumes atomic rename and commit semantics like HDFS provides.
Common triggers:
S3 eventual consistency — the file might exist but not yet visible when Spark tries to read it.
Partially written or deleted checkpoint files — if an executor or the job failed mid-commit.
Misconfigured committer or checkpoint file manager — the "magic committer" setup can cause issues with state store checkpoints (which aren’t output data but internal metadata).
Concurrent writes to the same checkpoint folder — e.g., restarting the job without proper stop or cleanup.
S3 lifecycle rules or cleanup deleting small files under checkpoint directory.
You configured:
.config("spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled", "true")
.config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
.config("spark.hadoop.fs.s3a.committer.name", "magic")
.config("spark.sql.streaming.checkpointFileManagerClass", "org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager")
These are correct for streaming output to S3 — but not ideal for Spark’s internal state store, which writes lots of small .delta files very frequently.
The “magic committer” tries to do atomic renames using temporary directories, but the state store’s file layout doesn’t cooperate well with it.
So you likely had a transient failure where 1.delta was being written, and then Spark failed before it was visible or committed — leaving a missing file reference.
If possible:
.option("checkpointLocation", "hdfs:///checkpoints/myjob")
or if on EMR:
.option("checkpointLocation", "s3://mybucket/checkpoints/")
.config("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
💡 Best practice:
Use S3 only for output sinks, not for streaming state checkpoints.
If you must use S3, use a consistent storage layer like:
S3 with DynamoDB locking via Delta Lake (not in your case)
HDFS or EBS-backed file system for checkpoints/state
Keep the committer for your output sink, but not for checkpoint/state store.
Try:
.config("spark.sql.streaming.checkpointFileManagerClass", "org.apache.spark.sql.execution.streaming.CheckpointFileManager")
.config("spark.hadoop.fs.s3a.committer.name", "directory")
and remove:
.config("spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled", "true")
This forces Spark to write checkpoints with simpler semantics (no magic rename tricks).
Make sure no two jobs are writing to the same checkpoint directory.
If the old job didn’t shut down gracefully (stopGracefullyOnShutdown), the state might have been mid-write.
If the checkpoint is already corrupted, you may need to delete the affected checkpoint folder and restart from scratch (you’ll lose streaming state, but it will recover).
There were several S3A + Structured Streaming fixes in Spark 3.5+.
If you can, upgrade to Spark 3.5.x (lots of S3 committer and state store improvements).
ActionRecommendationCheckpoint directoryUse HDFS/local if possibleMagic committerDisable for checkpointsS3 lifecycle rulesEnsure they don’t delete small filesSpark versionPrefer ≥ 3.5.0Job restartsEnsure only one writer per checkpointAfter crashClear corrupted state folder before restart
If you share your deployment environment (EMR / K8s / Dataproc / on-prem cluster) I can give you a precise config for reliable S3 checkpointing.
Would you like me to show the updated Spark session builder config with safe S3 settings for streaming checkpoints?