Exercícios Práticos: Dominando LLMs na Prática
Estes exercícios complementam o Capítulo 5, oferecendo experiência hands-on com configuração de parâmetros de geração, gerenciamento de context window, otimização de custo/latência e observabilidade em produção. Cada exercício é executável e projetado para reforçar os conceitos apresentados no capítulo.
Exercício 1: Exploração de Parâmetros de Geração
Objetivo: Compreender empiricamente como temperature, top-p e top-k afetam a qualidade e diversidade das respostas geradas.
Por que? Escolher configurações inadequadas pode resultar em respostas repetitivas, incoerentes ou imprecisas. Este exercício permite que você visualize os trade-offs na prática.
Código:
# uv pip install openai python-dotenv
import os
import openai
from dotenv import load_dotenv
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
def test_temperature_variations(prompt: str):
"""Testa diferentes valores de temperature"""
temperatures = [0.0, 0.3, 0.7, 1.0, 1.5, 2.0]
print(f"Prompt: {prompt}\n")
print("=" * 80)
for temp in temperatures:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=temp,
max_tokens=100
)
result = response.choices[0].message.content
print(f"\nTemperature = {temp}")
print(f"Resposta: {result}")
print("-" * 80)
def test_sampling_strategies(prompt: str):
"""Compara top-k vs top-p"""
configs = [
{"name": "Greedy (temp=0)", "temperature": 0.0, "top_p": 1.0},
{"name": "Top-p 0.9", "temperature": 0.7, "top_p": 0.9},
{"name": "Top-p 0.5 (focado)", "temperature": 0.7, "top_p": 0.5},
{"name": "Top-p 0.95 (diverso)", "temperature": 1.0, "top_p": 0.95},
]
print(f"Prompt: {prompt}\n")
print("=" * 80)
for config in configs:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=config["temperature"],
top_p=config["top_p"],
max_tokens=100
)
result = response.choices[0].message.content
print(f"\n{config['name']}")
print(f"Resposta: {result}")
print("-" * 80)
# ============================================================================
# Testes
# ============================================================================
# Teste 1: Tarefa factual (prefere temperature baixa)
print("\n### TESTE 1: TAREFA FACTUAL ###\n")
test_temperature_variations(
"Qual a capital da França? Responda em uma frase."
)
# Teste 2: Tarefa criativa (prefere temperature alta)
print("\n\n### TESTE 2: TAREFA CRIATIVA ###\n")
test_temperature_variations(
"Crie um slogan para uma cafeteria futurista em Marte."
)
# Teste 3: Comparação de sampling strategies
print("\n\n### TESTE 3: SAMPLING STRATEGIES ###\n")
test_sampling_strategies(
"Liste 3 vantagens do Python para ciência de dados."
)Desafios:
- Execute o código 3 vezes com o mesmo prompt e temperature=1.0. O que você observa sobre a variabilidade das respostas?
- Modifique o código para adicionar
presence_penalty=0.6efrequency_penalty=0.8. Como isso afeta a diversidade? - Crie um teste para geração de código Python. Qual configuração produz código mais correto: temperature=0.0 ou temperature=0.7?
Reflexão:
- Em que cenários você usaria temperature=0.0 em sistemas de produção?
- Como você balancearia criatividade e precisão em um chatbot de atendimento ao cliente?
- Por que top-p é preferível a top-k em muitos casos?
Exercício 2: Gerenciamento de Context Window
Objetivo: Implementar e comparar estratégias de gerenciamento de contexto: sliding window, summarization e híbrida.
Por que? Em aplicações conversacionais longas (ex: assistentes, agentes multi-turno), o histórico pode exceder rapidamente o context window. Este exercício mostra como manter contexto relevante sem estourar limites.
Código:
# uv pip install openai tiktoken python-dotenv
import os
import openai
import tiktoken
from typing import List, Dict
from dotenv import load_dotenv
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
def count_tokens(messages: List[Dict], model: str = "gpt-3.5-turbo") -> int:
"""Conta tokens em uma lista de mensagens"""
encoding = tiktoken.encoding_for_model(model)
num_tokens = 0
for message in messages:
# Overhead de formatação por mensagem
num_tokens += 4
for key, value in message.items():
num_tokens += len(encoding.encode(str(value)))
num_tokens += 2 # Every reply is primed with <im_start>assistant
return num_tokens
def sliding_window_strategy(
messages: List[Dict],
max_messages: int = 10
) -> List[Dict]:
"""Mantém apenas as últimas N mensagens"""
if len(messages) <= max_messages:
return messages
# Preserva system message + últimas max_messages
system_msgs = [m for m in messages if m["role"] == "system"]
recent_msgs = messages[-max_messages:]
return system_msgs + recent_msgs
def summarization_strategy(
messages: List[Dict],
max_tokens: int = 4000
) -> List[Dict]:
"""Sumariza mensagens antigas quando contexto fica muito longo"""
current_tokens = count_tokens(messages)
if current_tokens <= max_tokens:
return messages
# Sumariza mensagens antigas (exceto system e últimas 3)
system_msgs = [m for m in messages if m["role"] == "system"]
to_summarize = messages[len(system_msgs):-3]
recent_msgs = messages[-3:]
# Gera resumo
summary_prompt = f"""Resuma o histórico de conversa abaixo em 2-3 parágrafos,
preservando informações-chave e contexto relevante:
{to_summarize}
"""
summary = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": summary_prompt}],
temperature=0.3,
max_tokens=300
).choices[0].message.content
summary_msg = {
"role": "system",
"content": f"[RESUMO DO HISTÓRICO]: {summary}"
}
return system_msgs + [summary_msg] + recent_msgs
def hybrid_strategy(
messages: List[Dict],
max_messages: int = 8,
max_tokens: int = 3500
) -> List[Dict]:
"""Combina sliding window e summarization"""
# Primeiro aplica sliding window
windowed = sliding_window_strategy(messages, max_messages)
# Depois sumariza se ainda estiver muito longo
return summarization_strategy(windowed, max_tokens)
# ============================================================================
# Simulação de Conversação Longa
# ============================================================================
conversation = [
{"role": "system", "content": "Você é um assistente técnico especializado em Python."},
{"role": "user", "content": "Explique list comprehensions em Python"},
{"role": "assistant", "content": "[Resposta longa sobre list comprehensions - 200 tokens]"},
{"role": "user", "content": "E quanto a generators?"},
{"role": "assistant", "content": "[Resposta longa sobre generators - 250 tokens]"},
{"role": "user", "content": "Como usar decorators?"},
{"role": "assistant", "content": "[Resposta longa sobre decorators - 300 tokens]"},
{"role": "user", "content": "Explique context managers"},
{"role": "assistant", "content": "[Resposta longa sobre context managers - 280 tokens]"},
{"role": "user", "content": "O que são metaclasses?"},
{"role": "assistant", "content": "[Resposta longa sobre metaclasses - 350 tokens]"},
# ... mais 15 rodadas de conversa
]
# Testa cada estratégia
print(f"Tokens originais: {count_tokens(conversation)}")
windowed = sliding_window_strategy(conversation, max_messages=6)
print(f"\nSliding Window (6 msgs): {count_tokens(windowed)} tokens")
print(f"Mensagens preservadas: {len(windowed)}")
summarized = summarization_strategy(conversation, max_tokens=2000)
print(f"\nSummarization: {count_tokens(summarized)} tokens")
print(f"Mensagens finais: {len(summarized)}")
hybrid = hybrid_strategy(conversation, max_messages=6, max_tokens=1800)
print(f"\nHybrid: {count_tokens(hybrid)} tokens")
print(f"Mensagens finais: {len(hybrid)}")Desafios:
- Implemente uma estratégia de token-based trimming que remove mensagens do meio do histórico, preservando as mais antigas e mais recentes.
- Adicione logging para rastrear quando sumarizações ocorrem e quanto contexto é perdido.
- Crie um benchmark comparando latência e custo das 3 estratégias em uma conversação de 50 rodadas.
Reflexão:
- Qual estratégia é melhor para um chatbot de suporte técnico que precisa lembrar de detalhes específicos?
- Como você lidaria com informações críticas (ex: dados do usuário) que não podem ser sumarizadas?
- Quais são os trade-offs entre preservar histórico completo vs. usar RAG para recuperar contexto relevante?
Exercício 3: Otimização de Custo e Latência
Objetivo: Implementar sistema de caching e model routing para reduzir custos e melhorar latência.
Por que? Chamadas LLM são caras (até $0.06/1K tokens) e lentas (500-2000ms). Caching pode reduzir custos em 60-80% e latência em 95%+ para queries repetidas.
Código:
# uv pip install openai redis hashlib python-dotenv
import os
import openai
import redis
import hashlib
import json
import time
from typing import Dict, Any, Optional
from dotenv import load_dotenv
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
class LLMCache:
"""Sistema de cache para respostas LLM com Redis"""
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379):
self.redis = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)
self.hits = 0
self.misses = 0
def _generate_key(self, prompt: str, config: Dict) -> str:
"""Gera chave de cache determinística"""
# Inclui prompt + parâmetros relevantes
key_data = {
"prompt": prompt,
"model": config.get("model", "gpt-3.5-turbo"),
"temperature": config.get("temperature", 0.7),
"max_tokens": config.get("max_tokens", 500)
}
key_string = json.dumps(key_data, sort_keys=True)
return f"llm_cache:{hashlib.sha256(key_string.encode()).hexdigest()}"
def get(self, prompt: str, config: Dict) -> Optional[str]:
"""Busca resposta no cache"""
key = self._generate_key(prompt, config)
result = self.redis.get(key)
if result:
self.hits += 1
return result
self.misses += 1
return None
def set(
self,
prompt: str,
config: Dict,
response: str,
ttl: int = 3600
):
"""Salva resposta no cache com TTL"""
key = self._generate_key(prompt, config)
self.redis.setex(key, ttl, response)
def get_stats(self) -> Dict[str, Any]:
"""Retorna estatísticas de cache"""
total = self.hits + self.misses
hit_rate = self.hits / total if total > 0 else 0
return {
"hits": self.hits,
"misses": self.misses,
"hit_rate": hit_rate
}
class ModelRouter:
"""Roteia queries para modelo adequado (model cascading)"""
# Preços por 1K tokens (input / output)
PRICING = {
"gpt-4": (0.03, 0.06),
"gpt-3.5-turbo": (0.0015, 0.002),
}
def classify_complexity(self, prompt: str) -> str:
"""Classifica complexidade da query"""
# Heurísticas simples (em produção, usar modelo classificador)
keywords_simple = ["o que é", "defina", "liste", "qual", "quando"]
keywords_complex = ["analise", "compare", "explique detalhadamente", "raciocine"]
prompt_lower = prompt.lower()
if any(kw in prompt_lower for kw in keywords_complex):
return "complex"
elif any(kw in prompt_lower for kw in keywords_simple):
return "simple"
else:
return "medium"
def select_model(self, prompt: str) -> str:
"""Seleciona modelo apropriado"""
complexity = self.classify_complexity(prompt)
# Cascading: queries simples usam modelo barato
if complexity == "simple":
return "gpt-3.5-turbo"
else:
return "gpt-4"
def estimate_cost(
self,
model: str,
input_tokens: int,
output_tokens: int
) -> float:
"""Estima custo da chamada"""
input_price, output_price = self.PRICING[model]
return (input_tokens / 1000 * input_price) + (output_tokens / 1000 * output_price)
# ============================================================================
# Sistema Completo com Cache + Routing
# ============================================================================
class OptimizedLLMClient:
"""Cliente LLM otimizado com cache e routing"""
def __init__(self):
self.cache = LLMCache()
self.router = ModelRouter()
self.total_cost = 0.0
def generate(self, prompt: str, config: Dict = None) -> Dict[str, Any]:
"""Gera resposta com otimizações"""
config = config or {}
start_time = time.time()
# 1. Tenta cache
cached = self.cache.get(prompt, config)
if cached:
return {
"response": cached,
"source": "cache",
"latency_ms": (time.time() - start_time) * 1000,
"cost_usd": 0.0,
"model": "cached"
}
# 2. Seleciona modelo
model = self.router.select_model(prompt)
config["model"] = model
# 3. Chama LLM
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**config
)
result = response.choices[0].message.content
usage = response.usage
# 4. Calcula custo
cost = self.router.estimate_cost(
model,
usage.prompt_tokens,
usage.completion_tokens
)
self.total_cost += cost
# 5. Salva no cache
self.cache.set(prompt, config, result, ttl=3600)
latency = (time.time() - start_time) * 1000
return {
"response": result,
"source": "llm",
"latency_ms": latency,
"cost_usd": cost,
"model": model,
"tokens": usage.total_tokens
}
def get_stats(self) -> Dict[str, Any]:
"""Retorna estatísticas agregadas"""
cache_stats = self.cache.get_stats()
return {
"total_cost_usd": self.total_cost,
"cache_hit_rate": cache_stats["hit_rate"],
"cache_hits": cache_stats["hits"],
"cache_misses": cache_stats["misses"]
}
# ============================================================================
# Benchmark
# ============================================================================
client = OptimizedLLMClient()
queries = [
"O que é Python?", # Simple → GPT-3.5
"O que é Python?", # Cache hit
"Compare Python e JavaScript em termos de performance, ecosistema e casos de uso", # Complex → GPT-4
"Liste 3 vantagens do Python", # Simple → GPT-3.5
"O que é Python?", # Cache hit novamente
]
print("### BENCHMARK: CACHE + ROUTING ###\n")
for i, query in enumerate(queries, 1):
print(f"\nQuery {i}: {query[:50]}...")
result = client.generate(query, {"temperature": 0.7, "max_tokens": 150})
print(f" Modelo: {result['model']}")
print(f" Source: {result['source']}")
print(f" Latência: {result['latency_ms']:.2f}ms")
print(f" Custo: ${result['cost_usd']:.6f}")
print("\n" + "=" * 80)
stats = client.get_stats()
print(f"\n### ESTATÍSTICAS FINAIS ###")
print(f"Custo total: ${stats['total_cost_usd']:.6f}")
print(f"Cache hit rate: {stats['cache_hit_rate']:.2%}")
print(f"Cache hits: {stats['cache_hits']}")
print(f"Cache misses: {stats['cache_misses']}")
# Comparação sem otimizações
print("\n### COMPARAÇÃO: SEM OTIMIZAÇÕES ###")
print("Se todas as queries usassem GPT-4 sem cache:")
print(" Custo estimado: ~$0.05")
print(f" Economia: {(1 - stats['total_cost_usd'] / 0.05) * 100:.1f}%")Desafios:
- Implemente semantic caching usando embeddings para encontrar queries similares.
- Adicione batch processing para processar múltiplas queries em uma única chamada.
- Implemente load balancing entre múltiplos providers (OpenAI, Anthropic, Azure).
Reflexão:
- Em que cenários o caching é inapropriado (ex: dados em tempo real)?
- Como você lidaria com cache invalidation quando modelos são atualizados?
- Quais são os trade-offs entre caching agressivo e frescor das respostas?
Exercício 4: Observabilidade e Monitoramento
Objetivo: Instrumentar aplicação LLM com métricas (Prometheus), tracing distribuído (OpenTelemetry) e logging estruturado.
Por que? Em produção, você precisa de visibilidade em tempo real de latência, custos, taxa de erro e qualidade. Este exercício mostra como construir observabilidade robusta.
Código:
# uv pip install openai prometheus-client opentelemetry-api opentelemetry-sdk structlog
import os
import time
import openai
import structlog
from typing import Dict, Any
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor
# ============================================================================
# Configuração de Observabilidade
# ============================================================================
# Prometheus Metrics
llm_requests_total = Counter(
'llm_requests_total',
'Total de requisições LLM',
['model', 'status']
)
llm_request_duration = Histogram(
'llm_request_duration_seconds',
'Latência de requisições LLM',
['model'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
llm_tokens_total = Counter(
'llm_tokens_total',
'Total de tokens processados',
['model', 'token_type']
)
llm_cost_usd_total = Counter(
'llm_cost_usd_total',
'Custo acumulado em USD',
['model']
)
llm_cache_hit_rate = Gauge(
'llm_cache_hit_rate',
'Taxa de cache hit'
)
# OpenTelemetry Tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
span_processor = BatchSpanProcessor(ConsoleSpanExporter())
trace.get_tracer_provider().add_span_processor(span_processor)
# Structured Logging
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
# ============================================================================
# Cliente LLM Instrumentado
# ============================================================================
class InstrumentedLLMClient:
"""Cliente LLM com observabilidade completa"""
PRICING = {
"gpt-4": (0.03, 0.06),
"gpt-3.5-turbo": (0.0015, 0.002),
}
def __init__(self):
openai.api_key = os.getenv("OPENAI_API_KEY")
self.cache_hits = 0
self.cache_misses = 0
def generate(
self,
prompt: str,
model: str = "gpt-3.5-turbo",
**kwargs
) -> Dict[str, Any]:
"""Gera resposta com instrumentação completa"""
# Inicia span para tracing distribuído
with tracer.start_as_current_span("llm_generate") as span:
span.set_attribute("model", model)
span.set_attribute("prompt_length", len(prompt))
# Log estruturado: início da requisição
log = logger.bind(model=model, prompt_length=len(prompt))
log.info("llm_request_started")
start_time = time.time()
status = "success"
try:
# Chamada LLM
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**kwargs
)
result = response.choices[0].message.content
usage = response.usage
# Calcula métricas
duration = time.time() - start_time
cost = self._calculate_cost(
model,
usage.prompt_tokens,
usage.completion_tokens
)
# Atualiza métricas Prometheus
llm_requests_total.labels(model=model, status=status).inc()
llm_request_duration.labels(model=model).observe(duration)
llm_tokens_total.labels(model=model, token_type="input").inc(usage.prompt_tokens)
llm_tokens_total.labels(model=model, token_type="output").inc(usage.completion_tokens)
llm_cost_usd_total.labels(model=model).inc(cost)
# Atualiza span
span.set_attribute("status", "success")
span.set_attribute("duration_seconds", duration)
span.set_attribute("tokens_total", usage.total_tokens)
span.set_attribute("cost_usd", cost)
# Log estruturado: sucesso
log.info(
"llm_request_completed",
duration_seconds=duration,
input_tokens=usage.prompt_tokens,
output_tokens=usage.completion_tokens,
cost_usd=cost
)
return {
"response": result,
"duration_seconds": duration,
"tokens": usage.total_tokens,
"cost_usd": cost,
"status": "success"
}
except openai.error.RateLimitError as e:
status = "rate_limited"
duration = time.time() - start_time
llm_requests_total.labels(model=model, status=status).inc()
llm_request_duration.labels(model=model).observe(duration)
span.set_attribute("status", "rate_limited")
log.error("llm_request_rate_limited", error=str(e))
raise
except Exception as e:
status = "error"
duration = time.time() - start_time
llm_requests_total.labels(model=model, status=status).inc()
llm_request_duration.labels(model=model).observe(duration)
span.set_attribute("status", "error")
span.set_attribute("error_message", str(e))
log.error("llm_request_failed", error=str(e), exc_info=True)
raise
def _calculate_cost(
self,
model: str,
input_tokens: int,
output_tokens: int
) -> float:
"""Calcula custo da chamada"""
input_price, output_price = self.PRICING.get(model, (0, 0))
return (input_tokens / 1000 * input_price) + (output_tokens / 1000 * output_price)
# ============================================================================
# Demonstração
# ============================================================================
# Inicia servidor Prometheus (métricas expostas em http://localhost:8000)
start_http_server(8000)
print("Prometheus metrics disponíveis em http://localhost:8000")
client = InstrumentedLLMClient()
# Simula carga de trabalho
queries = [
("Explique machine learning em 2 parágrafos", "gpt-3.5-turbo"),
("O que é fotossíntese?", "gpt-3.5-turbo"),
("Compare Python vs Java", "gpt-4"),
]
print("\n### EXECUÇÃO COM OBSERVABILIDADE ###\n")
for prompt, model in queries:
print(f"\nQuery: {prompt[:50]}...")
result = client.generate(prompt, model=model, temperature=0.7, max_tokens=150)
print(f" Status: {result['status']}")
print(f" Latência: {result['duration_seconds']:.2f}s")
print(f" Tokens: {result['tokens']}")
print(f" Custo: ${result['cost_usd']:.6f}")
print("\n" + "=" * 80)
print("\nMétricas Prometheus exportadas em http://localhost:8000/metrics")
print("Exemplo de queries PromQL úteis:")
print(" - Taxa de requisições: rate(llm_requests_total[5m])")
print(" - Latência P95: histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m]))")
print(" - Custo/hora: rate(llm_cost_usd_total[1h]) * 3600")Desafios:
- Configure Grafana para visualizar as métricas Prometheus em um dashboard.
- Implemente alertas para latência P95 > 3s ou taxa de erro > 1%.
- Adicione sampling de requests para inspecionar prompts/respostas problemáticas.
Reflexão:
- Quais métricas são mais críticas para detectar degradação de qualidade?
- Como você balancearia observabilidade completa vs. overhead de performance?
- Como implementar monitoramento de custos em tempo real para evitar gastos excessivos?
Recursos Adicionais
- Documentação OpenAI: https://platform.openai.com/docs
- Prometheus Best Practices: https://prometheus.io/docs/practices/naming/
- OpenTelemetry Python: https://opentelemetry.io/docs/languages/python/
- Redis Caching Patterns: https://redis.io/docs/manual/patterns/
- LLM Cost Tracking: https://github.com/BerriAI/litellm (biblioteca útil)