Developer Guide
Complete technical documentation for DataGenFlow developers.
Table of Contents
Architecture
Project Structure
lib/
blocks/
builtin/ # Stable blocks (llm, validator, json_validator, output)
custom/ # Experimental blocks
base.py # BaseBlock interface
registry.py # Auto-discovery engine
templates/ # Pipeline templates (YAML)
errors.py # Custom exception classes
workflow.py # Pipeline execution with tracing
storage.py # Database operations (aiosqlite)
generator.py # LLM wrapper (OpenAI-compatible)
template_renderer.py # Jinja2 template rendering
frontend/
src/
pages/
Pipelines.tsx # Visual pipeline builder and manager
Generator.tsx # Dataset generation with progress tracking
Review.tsx # Review records with execution traces
components/
pipeline-editor/ # ReactFlow-based visual editor
tests/
conftest.py # Test configuration and fixtures
blocks/ # Block unit tests
test_api.py # API endpoint tests
test_workflow.py # Pipeline execution tests
test_storage.py # Database operations tests
test_errors.py # Error handling tests
Core Concepts
Block System
- Blocks are the fundamental building units
- Each block declares inputs and outputs
- Blocks execute asynchronously and return dictionaries
- Output validation enforced at runtime
Pipeline Execution
- Sequential execution with accumulated state
- Full trace capture (inputs, outputs, timing, errors)
- Correlation IDs for request tracking
- Graceful error handling with context
Storage Layer
- SQLite with aiosqlite for async operations
- Separate tables: pipelines, jobs, records
- Automatic schema migrations
- Foreign key constraints for data integrity
Development Setup
Prerequisites
- Python 3.10+
- Node.js 20+ and Yarn
- uv (Python package manager)
Installation
# Clone repository
git clone <repository-url>
cd DataGenFlow
# Install dependencies
make dev
# Run tests
make test
Development Workflow
# Start development servers (hot reload)
make run-dev
# Run separately
make dev-backend # Backend only (port 8000)
make dev-ui # Frontend only (port 5173)
# Code quality
make lint # Backend linting (ruff)
make typecheck # Type checking (mypy)
make format # Format code (ruff)
make lint-frontend # Frontend linting (ESLint)
make typecheck-frontend # TypeScript type checking
# Production build
make build-ui # Build frontend
make run # Run production server
Testing
Running Tests
# All tests (93 tests)
make test
# Specific test suites
uv run pytest tests/blocks/ -v
uv run pytest tests/test_api.py -v
uv run pytest tests/test_workflow.py -v
# With coverage
uv run pytest --cov=lib --cov=app tests/
Test Database
Tests use a separate in-memory database that is automatically created and cleaned up after each test session.
Writing Tests
import pytest
@pytest.mark.asyncio
async def test_pipeline_execution():
"""Test pipeline execution with trace"""
pipeline = Pipeline(...)
result, trace, trace_id = await pipeline.execute({"input": "test"})
assert "output" in result
assert len(trace) > 0
assert trace_id is not None
API Reference
Blocks API
List all blocks
GET /api/blocks
Response:
[
{
"type": "TextGenerator",
"name": "LLM Generator",
"description": "Generate text using LLM",
"inputs": ["system", "user"],
"outputs": ["assistant"],
"config": {
"temperature": "float",
"max_tokens": "int"
}
}
]
Pipelines API
Create pipeline
POST /api/pipelines
Content-Type: application/json
{
"name": "My Pipeline",
"blocks": [
{
"type": "TextGenerator",
"config": {"temperature": 0.7}
}
]
}
Execute pipeline
POST /api/pipelines/{id}/execute
Content-Type: application/json
{"text": "input data"}
Response:
{
"result": {"output": "..."},
"trace": [
{
"block_type": "TextGenerator",
"input": {...},
"output": {...},
"accumulated_state": {...},
"execution_time": 1.234
}
],
"trace_id": "uuid"
}
Jobs API
Start generation job
POST /api/generate
Content-Type: multipart/form-data
file=@seeds.json
pipeline_id=1
Get job status
GET /api/jobs/{job_id}
Response:
{
"id": 1,
"pipeline_id": 1,
"status": "running",
"total_seeds": 100,
"records_generated": 45,
"records_failed": 2
}
Records API
List records
GET /api/records?status=pending&limit=100&job_id=1
Update record
PUT /api/records/{id}
Content-Type: application/json
{
"status": "accepted",
"output": "updated text"
}
Export records
GET /api/export?status=accepted&job_id=1
GET /api/export/download?status=accepted
Debugging
Debugging Custom Blocks
DataGenFlow provides a simple debugging workflow using VS Code:
- Create your pipeline using the visual editor
- Note the pipeline ID from the UI (click “Debug Instructions” to expand)
- Open
debug_pipeline.pyand set:PIPELINE_IDto your pipeline IDSEED_DATAwith test input
- Set breakpoints in your custom blocks
- Press F5 in VS Code and select “Debug Pipeline”
The debugger will stop at your breakpoints, allowing you to inspect variables, step through code, and debug your custom block logic.
Tips:
- Use the “Copy” button next to pipeline ID in the UI
- Edit seed data directly in debug_pipeline.py for fast iteration
- The pipeline executes exactly as it would in production
- No need to rebuild frontend - pipelines persist in database
Debug Mode
Enable detailed logging in .env:
DEBUG=true
Features:
- Correlation IDs in all logs
- Per-block execution timing
- Full input/output state logging
- Stack traces with context
Example logs:
2025-10-14 10:15:32 [INFO] [a1b2c3d4] Pipeline 'Data Gen' started (3 blocks)
2025-10-14 10:15:32 [DEBUG] [a1b2c3d4] Block 1/3: TextGenerator executing
2025-10-14 10:15:35 [DEBUG] [a1b2c3d4] TextGenerator completed (3.124s)
2025-10-14 10:15:35 [INFO] [a1b2c3d4] Pipeline completed successfully
Error Handling
Custom Exceptions
BlockNotFoundError: Block type not registeredBlockExecutionError: Runtime execution failureValidationError: Output validation failure
All exceptions include:
- Structured error message
- Context dictionary with details
- HTTP-appropriate status codes
Error Response Format:
{
"error": "Block 'InvalidBlock' not found",
"detail": {
"block_type": "InvalidBlock",
"available_blocks": ["TextGenerator", "ValidatorBlock", ...]
}
}
Code Quality
Type Checking
Mypy Configuration (pyproject.toml):
[tool.mypy]
python_version = "3.10"
strict = true
warn_return_any = true
[[tool.mypy.overrides]]
module = "tests.*"
disable_error_code = ["no-untyped-def"]
Run type checking:
make typecheck # Backend
make typecheck-frontend # Frontend
Linting
Ruff (Backend)
make lint # Check
make format # Fix
ESLint (Frontend)
make lint-frontend # Check
Code Style
- Backend: Follow PEP 8, enforced by ruff
- Frontend: Prettier + ESLint
- Line length: 100 characters
- Imports: Sorted automatically
- Type hints: Required for all public APIs
Custom Block Development
Block Interface
from lib.blocks.base import BaseBlock
from typing import Any
class MyBlock(BaseBlock):
# Required class attributes
name: str = "My Block"
description: str = "What this block does"
inputs: list[str] = ["input_field"]
outputs: list[str] = ["output_field"]
# Optional: Get config schema for UI
def get_schema(self) -> dict[str, Any]:
return {
"my_param": {
"type": "string",
"default": "default_value",
"description": "Parameter description"
}
}
# Required: Execute logic
async def execute(self, data: dict[str, Any]) -> dict[str, Any]:
# Access config
param = self.config.get("my_param", "default")
# Access input
input_value = data["input_field"]
# Your logic here
result = process(input_value, param)
# Return only declared outputs
return {"output_field": result}
Block Discovery
- Create file in
user_blocks/orlib/blocks/custom/ - Inherit from
BaseBlock - Restart server
- Block automatically appears in UI
Registry scans:
lib/blocks/builtin/- Stable blockslib/blocks/custom/- Experimental blocksuser_blocks/- User-created blocks
Testing Custom Blocks
import pytest
from your_block import MyBlock
@pytest.mark.asyncio
async def test_my_block():
block = MyBlock(config={"my_param": "test"})
result = await block.execute({"input_field": "test data"})
assert "output_field" in result
assert result["output_field"] == expected_value
Performance Optimization
Database
- Use
LIMITandOFFSETfor large result sets - Records API supports pagination
- Indexes on frequently queried fields (
status,created_at)
Pipeline Execution
- Blocks execute sequentially (one at a time)
- Use
asynciofor I/O-bound operations - Trace overhead is minimal (~1-2ms per block)
Frontend
- ReactFlow handles large pipelines efficiently
- Record review uses windowed scrolling
- API calls are debounced where appropriate
Deployment
Production Checklist
# Build frontend
make build-ui
# Set production environment
DEBUG=false
LLM_API_KEY=your-production-key
# Run with production server
make run
# Or use systemd/supervisor/docker
uv run uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
Docker (Optional)
FROM python:3.10-slim
WORKDIR /app
COPY . .
RUN pip install uv && uv sync
CMD ["uv", "run", "uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
Contributing
See CONTRIBUTING for guidelines on:
- Code style and conventions
- PR title format
- Review process
- Testing requirements