I ended up taking up @MatsLindh suggestion and implementing FastAPI end-point that triggers a Scrapy spider using Celery to queue requests.
The Celery task is as follows:
from celery.app import Celery
import os
import subprocess
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
celery_app = Celery(
"scraper",
broker = redis_url,
backend = redis_url
)
@celery_app.task
def run_spiderfool_task(start_page=1, number_pages=2):
try:
# Run the Scrapy spider with arguments
command = [
"scrapy", "crawl", "spiderfool", # spider name
"-a", f"start_page={start_page}", # custom argument for start_page
"-a", f"number_pages={number_pages}" # custom argument for number_pages
]
# Execute the command
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Wait for the process to finish
stdout, stderr = process.communicate()
# Check if there are any errors
if process.returncode != 0:
print(f"Error: {stderr.decode('utf-8')}")
else:
print(f"Spider completed successfully! Output:\n{stdout.decode('utf-8')}")
except Exception as e:
print(f"An error occurred: {e}")
Celery uses a Redis broker and backend. The process.communicate()
is blocking and ensures the crawler completes before exiting the function.
The FastAPI endpoints are as follows:
@app.post("/trigger/spiderfool")
def trigger_spider(
start_page: int = Query(1, ge=1),
number_pages: int = Query(2, ge=1)
):
spiderfool_task = run_spiderfool_task.delay(start_page, number_pages)
return {"job_id": spiderfool_task.id, "message": "Started spider!"}
@app.get("/status/{job_id}")
def get_status(job_id: str):
job_info = AsyncResult(job_id)
return {
"job_id": job_info.id,
"job_status": job_info.status,
"job_result": job_info.result
}
I dockerized the app to have three containers as follows:
Now I can trigger spiders from FastAPI and monitor the job with the job id.
Thanks @MatsLindh @wRAR for inputs.