ha

I need a python code: 1) A genai client factory 2) config file in config.json, 3) will drive from a usecase 4) usecase will have a pool of genai clients 5) genai client of multiple provider 6)genai provider has custom logic to call LLM and generate text. 7) At at usecase call, a client will be served on round robin logic


import json
import os
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any
from dataclasses import dataclass

# Install deps: pip install groq openai ollama

@dataclass
class GenerationResponse:
    content: str
    provider: str
    model: str

class GenAIProvider(ABC):
    """Abstract base for GenAI providers."""
    
    @abstractmethod
    def generate(self, prompt: str, model: str = None) -> GenerationResponse:
        pass

class GroqProvider(GenAIProvider):
    def __init__(self, config: Dict[str, Any]):
        from groq import Groq
        self.client = Groq(api_key=config['api_key'])
        self.models = config['models']
    
    def generate(self, prompt: str, model: str = None) -> GenerationResponse:
        model = model or self.models[0]
        chat_completion = self.client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        return GenerationResponse(
            content=chat_completion.choices[0].message.content,
            provider="groq",
            model=model
        )

class OpenAIProvider(GenAIProvider):
    def __init__(self, config: Dict[str, Any]):
        from openai import OpenAI
        self.client = OpenAI(api_key=config['api_key'])
        self.models = config['models']
    
    def generate(self, prompt: str, model: str = None) -> GenerationResponse:
        model = model or self.models[0]
        chat_completion = self.client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        return GenerationResponse(
            content=chat_completion.choices[0].message.content,
            provider="openai",
            model=model
        )

class OllamaProvider(GenAIProvider):
    def __init__(self, config: Dict[str, Any]):
        import ollama
        self.client = ollama
        self.base_url = config.get('base_url')
        self.models = config['models']
    
    def generate(self, prompt: str, model: str = None) -> GenerationResponse:
        model = model or self.models[0]
        response = self.client.chat(model=model, messages=[{"role": "user", "content": prompt}])
        return GenerationResponse(
            content=response['message']['content'],
            provider="ollama",
            model=model
        )

class GenAIClientFactory:
    """Factory to create provider clients from config."""
    
    PROVIDER_CLASSES = {
        'groq': GroqProvider,
        'openai': OpenAIProvider,
        'ollama': OllamaProvider
    }
    
    @classmethod
    def from_config(cls, config_path: str = 'config.json') -> Dict[str, GenAIProvider]:
        with open(config_path) as f:
            config = json.load(f)
        providers = {}
        for name, cfg in config['providers'].items():
            provider_class = cls.PROVIDER_CLASSES.get(name)
            if provider_class:
                providers[name] = provider_class(cfg)
        return providers

class UseCaseManager:
    """Manages pool of clients per usecase with round-robin selection."""
    
    def __init__(self, all_clients: Dict[str, GenAIProvider], config: Dict[str, Any]):
        self.clients = all_clients
        self.usecases = config['usecases']
        self.client_pools: Dict[str, List[GenAIProvider]] = {}
        self.current_indices: Dict[str, int] = {}
        self._build_pools()
    
    def _build_pools(self):
        for usecase_name, usecase_config in self.usecases.items():
            client_names = usecase_config['clients']
            self.client_pools[usecase_name] = [self.clients[name] for name in client_names if name in self.clients]
            self.current_indices[usecase_name] = 0
    
    def get_client(self, usecase: str) -> GenAIProvider:
        """Round-robin client selection for usecase."""
        if usecase not in self.client_pools:
            raise ValueError(f"Unknown usecase: {usecase}")
        pool = self.client_pools[usecase]
        if not pool:
            raise ValueError(f"No clients available for usecase: {usecase}")
        client = pool[self.current_indices[usecase]]
        self.current_indices[usecase] = (self.current_indices[usecase] + 1) % len(pool)
        return client

# Usage Example
if __name__ == "__main__":
    # 1. Factory creates all clients
    all_clients = GenAIClientFactory.from_config()
    
    # 2. UseCaseManager loads pools from config
    manager = UseCaseManager(all_clients, json.load(open('config.json')))
    
    # 3. Drive from usecase - round-robin serves clients
    prompt = "Explain round-robin load balancing."
    for i in range(5): # Simulates 5 calls
        client = manager.get_client("chat")
        response = client.generate(prompt)
        print(f"Call {i+1}: {response.provider} ({response.model}) - {response.content[:100]}...")[web:13]



# GroqProvider (updated)
class GroqProvider(GenAIProvider):
    def __init__(self, config: Dict[str, Any]):
        from groq import AsyncGroq
        super().__init__(config)
        self.client = AsyncGroq(api_key=config['api_key'])
    
    async def _async_generate(self, prompt: str, model: str, **kwargs) -> str:
        chat_completion = await self.client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            **kwargs
        )
        return chat_completion.choices[0].message.content

# OpenAIProvider (similar)
class OpenAIProvider(GenAIProvider):
    def __init__(self, config: Dict[str, Any]):
        from openai import AsyncOpenAI
        super().__init__(config)
        self.client = AsyncOpenAI(api_key=config['api_key'])
    
    async def _async_generate(self, prompt: str, model: str, **kwargs) -> str:
        chat_completion = await self.client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            **kwargs
        )
        return chat_completion.choices[0].message.content

# OllamaProvider
class OllamaProvider(GenAIProvider):
    def __init__(self, config: Dict[str, Any]):
        import ollama
        super().__init__(config)
        self.client = ollama # Sync, but wrapped async
    
    async def _async_generate(self, prompt: str, model: str, **kwargs) -> str:
        response = self.client.chat(model=model, messages=[{"role": "user", "content": prompt}])
        return response['message']['content']

.import abc
import asyncio
import logging
import time
from typing import Dict, Any, Optional, AsyncGenerator, Generator
from dataclasses import dataclass
import backoff # pip install backoff

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class GenerationResponse:
    content: str
    provider: str
    model: str
    tokens_used: Optional[int] = None
    latency_ms: Optional[float] = None

class GenAIProvider(abc.ABC):
    """Robust abstract base for all GenAI providers with retries, logging, and async."""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.models = config.get('models', [])
        self.default_model = config.get('default_model', self.models[0] if self.models else None)
        self.max_retries = config.get('max_retries', 3)
        self.timeout = config.get('timeout', 30.0)
        self.rate_limit_delay = config.get('rate_limit_delay', 0.1)
        
        if not self.models:
            raise ValueError("No models specified in config")
    
    def _validate_model(self, model: Optional[str]) -> str:
        """Validate and return model."""
        model = model or self.default_model
        if model not in self.models:
            raise ValueError(f"Invalid model '{model}'. Allowed: {self.models}")
        return model
    
    @backoff.on_exception(backoff.expo, Exception, max_tries=3)
    def generate(self, prompt: str, model: Optional[str] = None, **kwargs) -> GenerationResponse:
        """Synchronous generation with retries and logging."""
        start_time = time.time()
        try:
            model = self._validate_model(model)
            logger.info(f"Generating with {self.__class__.__name__} model={model}, prompt_len={len(prompt)}")
            
            content = asyncio.run(self._async_generate(prompt, model, **kwargs))
            
            latency = (time.time() - start_time) * 1000
            logger.info(f"Success: latency={latency:.2f}ms")
            
            return GenerationResponse(
                content=content,
                provider=self.__class__.__name__.lower(),
                model=model,
                latency_ms=latency
            )
        except Exception as e:
            logger.error(f"Generation failed after retries: {e}")
            raise
    
    @abc.abstractmethod
    async def _async_generate(self, prompt: str, model: str, **kwargs) -> str:
        """Provider-specific async generation (must implement)."""
        pass
    
    async def agenerate_stream(self, prompt: str, model: Optional[str] = None, **kwargs) -> AsyncGenerator[str, None]:
        """Async streaming (optional override)."""
        model = self._validate_model(model)
        logger.warning("Streaming not implemented; falling back to full generate.")
        full = await self._async_generate(prompt, model, **kwargs)
        yield full
    
    def generate_stream(self, prompt: str, model: Optional[str] = None, **kwargs) -> Generator[str, None, None]:
        """Sync streaming wrapper."""
        async def stream():
            async for chunk in self.agenerate_stream(prompt, model, **kwargs):
                yield chunk
        for chunk in asyncio.run(stream()):
            yield chunk
    
    def __repr__(self):
        return f"{self.__class__.__name__}(models={self.models[:2]}...)"



Comments

Popular posts from this blog

যমুনার বৃত্তান্ত: মর্মরের কারাগার

লেখকের নিয়তি

লীলা মজুমদার - আনন্দের জগৎ