#!/usr/bin/env python3
import requests
import json
from typing import Dict, Any, Optional
# API URL
API_URL = "https://openapi.coreclaw.com/api/v1/scraper/run"
# Your API KEY
API_KEY = "<YOUR_API_KEY>"
# Curl timeout (seconds)
TIMEOUT = 30
def run_scraper(params: Dict[str, Any], api_key: str) -> Dict[str, Any]:
headers = {
"api-key": api_key,
"Content-Type": "application/json"
}
try:
# Send POST request
response = requests.post(
API_URL,
headers=headers,
json=params,
timeout=TIMEOUT
)
# Check HTTP status code
if response.status_code != 200:
return {
"success": False,
"run_slug": None,
"error": f"HTTP error: {response.status_code} - {response.text}"
}
# Parse response
result = response.json()
# Check business error code
if result.get("code") != 0:
return {
"success": False,
"run_slug": None,
"error": f"Business error: {result.get("message", "Unknown error")} (code: {result.get("code")})"
}
# Return success result
return {
"success": True,
"run_slug": result.get("data", {}).get("run_slug"),
"error": None
}
except requests.exceptions.Timeout:
return {
"success": False,
"run_slug": None,
"error": f"Request timeout after {TIMEOUT} seconds"
}
except requests.exceptions.RequestException as e:
return {
"success": False,
"run_slug": None,
"error": f"Request error: {str(e)}"
}
except json.JSONDecodeError as e:
return {
"success": False,
"run_slug": None,
"error": f"JSON decode error: {str(e)}"
}
def main():
# Build request parameters
request_params = {
"scraper_slug": "01KG3VTPJMC1AYPTBPACKQKJBV",
"version": "v1.0.7",
"input": {
"parameters": {
"system": {
"proxy_region": "",
"cpus": 0.125,
"memory": 512,
"execute_limit_time_seconds": 1800,
"max_total_charge": 0,
"max_total_traffic": 0
},
"custom": {
"url": [
{
"url": "https://www.facebook.com/share/p/1K6xfHFkrK/",
"comments_sort": "All comments",
"limit_records": "",
"get_all_replies": ""
}
]
}
}
},
"callback_url": "https://your-domain.com/callback"
}
# Send request
print("Sending request to API...")
result = run_scraper(request_params, API_KEY)
# Handle result
if result["success"]:
print("Worker run successful!")
print(f"Run ID: {result['run_slug']}")
print("You can use this ID to query run status and results")
else:
print("Request failed!")
print(f"Error message: {result['error']}")
if __name__ == "__main__":
main()