#!/usr/bin/env python3
import requests
import json
from typing import Dict, Any, Optional
# API URL
API_URL = "https://openapi.coreclaw.com/api/v1/scraper/run"
# Your API KEY
API_KEY = "<YOUR_API_KEY>"
# Curl timeout (seconds)
TIMEOUT = 30
def run_scraper(params: Dict[str, Any], api_key: str) -> Dict[str, Any]:
headers = {
"api-key": api_key,
"Content-Type": "application/json"
}
try:
# Send POST request
response = requests.post(
API_URL,
headers=headers,
json=params,
timeout=TIMEOUT
)
# Check HTTP status code
if response.status_code != 200:
return {
"success": False,
"run_slug": None,
"error": f"HTTP error: {response.status_code} - {response.text}"
}
# Parse response
result = response.json()
# Check business error code
if result.get("code") != 0:
return {
"success": False,
"run_slug": None,
"error": f"Business error: {result.get("message", "Unknown error")} (code: {result.get("code")})"
}
# Return success result
return {
"success": True,
"run_slug": result.get("data", {}).get("run_slug"),
"error": None
}
except requests.exceptions.Timeout:
return {
"success": False,
"run_slug": None,
"error": f"Request timeout after {TIMEOUT} seconds"
}
except requests.exceptions.RequestException as e:
return {
"success": False,
"run_slug": None,
"error": f"Request error: {str(e)}"
}
except json.JSONDecodeError as e:
return {
"success": False,
"run_slug": None,
"error": f"JSON decode error: {str(e)}"
}
def main():
# Build request parameters
request_params = {
"scraper_slug": "01KG2DV66JTCN65ZBTRX3M456E",
"version": "v1.0.8",
"input": {
"parameters": {
"system": {
"proxy_region": "",
"cpus": 0.125,
"memory": 512,
"execute_limit_time_seconds": 1800,
"max_total_charge": 0,
"max_total_traffic": 0
},
"custom": {
"runUnits": [
{
"url": "https://coreclaw.local/__single_run__"
}
],
"scenario": "ecommerce-products",
"fields": [
{
"string": "productId"
},
{
"string": "sku"
}
],
"mergeStrategy": "keep-newest",
"timestampField": "updatedAt",
"dataSourceType": "direct-input",
"inputData": "[{\"productId\": \"P001\", \"sku\": \"SKU-A-BLACK\", \"name\": \"无线蓝牙耳机 Pro\", \"price\": 299.00, \"stock\": 156, \"source\": \"京东旗舰店\", \"updatedAt\": \"2024-01-20T10:30:00\"}, {\"productId\": \"P001\", \"sku\": \"SKU-A-BLACK\", \"name\": \"无线蓝牙耳机 Pro (黑)\", \"price\": 279.00, \"stock\": 200, \"source\": \"天猫旗舰店\", \"updatedAt\": \"2024-01-22T14:20:00\"}, {\"productId\": \"P001\", \"sku\": \"SKU-A-WHITE\", \"name\": \"无线蓝牙耳机 Pro\", \"price\": 299.00, \"stock\": 88, \"source\": \"京东旗舰店\", \"updatedAt\": \"2024-01-20T10:30:00\"}, {\"productId\": \"P002\", \"sku\": \"SKU-B\", \"name\": \"智能手表 Ultra\", \"price\": 1299.00, \"stock\": 45, \"source\": \"官网\", \"updatedAt\": \"2024-01-18T09:00:00\"}]",
"inputUrls": [
{
"url": "https://raw.githubusercontent.com/kael-odin/worker-dedup-datasets/main/test/data1.json"
}
],
"datasetIds": [],
"inputFormat": "json",
"output": "unique-items",
"generateReport": true,
"mode": "dedup-after-load",
"fieldsToLoad": [],
"nullAsUnique": false,
"parallelLoads": 10,
"parallelPushes": 5,
"batchSize": 5000,
"appendFileSource": false,
"verboseLog": false
}
}
},
"callback_url": "https://your-domain.com/callback"
}
# Send request
print("Sending request to API...")
result = run_scraper(request_params, API_KEY)
# Handle result
if result["success"]:
print("Worker run successful!")
print(f"Run ID: {result['run_slug']}")
print("You can use this ID to query run status and results")
else:
print("Request failed!")
print(f"Error message: {result['error']}")
if __name__ == "__main__":
main()