ha
I need a python code: 1) A genai client factory 2) config file in config.json, 3) will drive from a usecase 4) usecase will have a pool of genai clients 5) genai client of multiple provider 6)genai provider has custom logic to call LLM and generate text. 7) At at usecase call, a client will be served on round robin logic
import json
import os
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
# Install deps: pip install groq openai ollama
@dataclass
class GenerationResponse:
content: str
provider: str
model: str
class GenAIProvider(ABC):
"""Abstract base for GenAI providers."""
@abstractmethod
def generate(self, prompt: str, model: str = None) -> GenerationResponse:
pass
class GroqProvider(GenAIProvider):
def __init__(self, config: Dict[str, Any]):
from groq import Groq
self.client = Groq(api_key=config['api_key'])
self.models = config['models']
def generate(self, prompt: str, model: str = None) -> GenerationResponse:
model = model or self.models[0]
chat_completion = self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return GenerationResponse(
content=chat_completion.choices[0].message.content,
provider="groq",
model=model
)
class OpenAIProvider(GenAIProvider):
def __init__(self, config: Dict[str, Any]):
from openai import OpenAI
self.client = OpenAI(api_key=config['api_key'])
self.models = config['models']
def generate(self, prompt: str, model: str = None) -> GenerationResponse:
model = model or self.models[0]
chat_completion = self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return GenerationResponse(
content=chat_completion.choices[0].message.content,
provider="openai",
model=model
)
class OllamaProvider(GenAIProvider):
def __init__(self, config: Dict[str, Any]):
import ollama
self.client = ollama
self.base_url = config.get('base_url')
self.models = config['models']
def generate(self, prompt: str, model: str = None) -> GenerationResponse:
model = model or self.models[0]
response = self.client.chat(model=model, messages=[{"role": "user", "content": prompt}])
return GenerationResponse(
content=response['message']['content'],
provider="ollama",
model=model
)
class GenAIClientFactory:
"""Factory to create provider clients from config."""
PROVIDER_CLASSES = {
'groq': GroqProvider,
'openai': OpenAIProvider,
'ollama': OllamaProvider
}
@classmethod
def from_config(cls, config_path: str = 'config.json') -> Dict[str, GenAIProvider]:
with open(config_path) as f:
config = json.load(f)
providers = {}
for name, cfg in config['providers'].items():
provider_class = cls.PROVIDER_CLASSES.get(name)
if provider_class:
providers[name] = provider_class(cfg)
return providers
class UseCaseManager:
"""Manages pool of clients per usecase with round-robin selection."""
def __init__(self, all_clients: Dict[str, GenAIProvider], config: Dict[str, Any]):
self.clients = all_clients
self.usecases = config['usecases']
self.client_pools: Dict[str, List[GenAIProvider]] = {}
self.current_indices: Dict[str, int] = {}
self._build_pools()
def _build_pools(self):
for usecase_name, usecase_config in self.usecases.items():
client_names = usecase_config['clients']
self.client_pools[usecase_name] = [self.clients[name] for name in client_names if name in self.clients]
self.current_indices[usecase_name] = 0
def get_client(self, usecase: str) -> GenAIProvider:
"""Round-robin client selection for usecase."""
if usecase not in self.client_pools:
raise ValueError(f"Unknown usecase: {usecase}")
pool = self.client_pools[usecase]
if not pool:
raise ValueError(f"No clients available for usecase: {usecase}")
client = pool[self.current_indices[usecase]]
self.current_indices[usecase] = (self.current_indices[usecase] + 1) % len(pool)
return client
# Usage Example
if __name__ == "__main__":
# 1. Factory creates all clients
all_clients = GenAIClientFactory.from_config()
# 2. UseCaseManager loads pools from config
manager = UseCaseManager(all_clients, json.load(open('config.json')))
# 3. Drive from usecase - round-robin serves clients
prompt = "Explain round-robin load balancing."
for i in range(5): # Simulates 5 calls
client = manager.get_client("chat")
response = client.generate(prompt)
print(f"Call {i+1}: {response.provider} ({response.model}) - {response.content[:100]}...")[web:13]
# GroqProvider (updated)
class GroqProvider(GenAIProvider):
def __init__(self, config: Dict[str, Any]):
from groq import AsyncGroq
super().__init__(config)
self.client = AsyncGroq(api_key=config['api_key'])
async def _async_generate(self, prompt: str, model: str, **kwargs) -> str:
chat_completion = await self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**kwargs
)
return chat_completion.choices[0].message.content
# OpenAIProvider (similar)
class OpenAIProvider(GenAIProvider):
def __init__(self, config: Dict[str, Any]):
from openai import AsyncOpenAI
super().__init__(config)
self.client = AsyncOpenAI(api_key=config['api_key'])
async def _async_generate(self, prompt: str, model: str, **kwargs) -> str:
chat_completion = await self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**kwargs
)
return chat_completion.choices[0].message.content
# OllamaProvider
class OllamaProvider(GenAIProvider):
def __init__(self, config: Dict[str, Any]):
import ollama
super().__init__(config)
self.client = ollama # Sync, but wrapped async
async def _async_generate(self, prompt: str, model: str, **kwargs) -> str:
response = self.client.chat(model=model, messages=[{"role": "user", "content": prompt}])
return response['message']['content']
.import abc
import asyncio
import logging
import time
from typing import Dict, Any, Optional, AsyncGenerator, Generator
from dataclasses import dataclass
import backoff # pip install backoff
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class GenerationResponse:
content: str
provider: str
model: str
tokens_used: Optional[int] = None
latency_ms: Optional[float] = None
class GenAIProvider(abc.ABC):
"""Robust abstract base for all GenAI providers with retries, logging, and async."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.models = config.get('models', [])
self.default_model = config.get('default_model', self.models[0] if self.models else None)
self.max_retries = config.get('max_retries', 3)
self.timeout = config.get('timeout', 30.0)
self.rate_limit_delay = config.get('rate_limit_delay', 0.1)
if not self.models:
raise ValueError("No models specified in config")
def _validate_model(self, model: Optional[str]) -> str:
"""Validate and return model."""
model = model or self.default_model
if model not in self.models:
raise ValueError(f"Invalid model '{model}'. Allowed: {self.models}")
return model
@backoff.on_exception(backoff.expo, Exception, max_tries=3)
def generate(self, prompt: str, model: Optional[str] = None, **kwargs) -> GenerationResponse:
"""Synchronous generation with retries and logging."""
start_time = time.time()
try:
model = self._validate_model(model)
logger.info(f"Generating with {self.__class__.__name__} model={model}, prompt_len={len(prompt)}")
content = asyncio.run(self._async_generate(prompt, model, **kwargs))
latency = (time.time() - start_time) * 1000
logger.info(f"Success: latency={latency:.2f}ms")
return GenerationResponse(
content=content,
provider=self.__class__.__name__.lower(),
model=model,
latency_ms=latency
)
except Exception as e:
logger.error(f"Generation failed after retries: {e}")
raise
@abc.abstractmethod
async def _async_generate(self, prompt: str, model: str, **kwargs) -> str:
"""Provider-specific async generation (must implement)."""
pass
async def agenerate_stream(self, prompt: str, model: Optional[str] = None, **kwargs) -> AsyncGenerator[str, None]:
"""Async streaming (optional override)."""
model = self._validate_model(model)
logger.warning("Streaming not implemented; falling back to full generate.")
full = await self._async_generate(prompt, model, **kwargs)
yield full
def generate_stream(self, prompt: str, model: Optional[str] = None, **kwargs) -> Generator[str, None, None]:
"""Sync streaming wrapper."""
async def stream():
async for chunk in self.agenerate_stream(prompt, model, **kwargs):
yield chunk
for chunk in asyncio.run(stream()):
yield chunk
def __repr__(self):
return f"{self.__class__.__name__}(models={self.models[:2]}...)"
Comments
Post a Comment