Skip to content

microsoft/openaivec

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

535 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

openaivec

AI text processing for pandas and Spark. Apply one prompt to many rows with automatic batching and caching.

Contributor guidelines

Quick start

pip install openaivec

Apply one prompt to many values:

import os
import pandas as pd
from openaivec import pandas_ext

os.environ["OPENAI_API_KEY"] = "your-api-key"

fruits = pd.Series(["apple", "banana", "cherry"])
french_names = fruits.ai.responses("Translate this fruit name to French.")
print(french_names.tolist())
# ['pomme', 'banane', 'cerise']

For Azure OpenAI and custom client setup, see pandas authentication options.

Pandas tutorial (GitHub Pages): https://microsoft.github.io/openaivec/examples/pandas/

Benchmarks

Simple task benchmark from benchmark.ipynb (100 numeric strings β†’ integer literals, Series.aio.responses, model gpt-5.1):

Mode Settings Time (s)
Serial batch_size=1, max_concurrency=1 ~141
Batching default batch_size, max_concurrency=1 ~15
Concurrent batching default batch_size, default max_concurrency ~6

Batching alone removes most HTTP overhead, and letting batching overlap with concurrency cuts total runtime to a few seconds while still yielding one output per input.

image

Contents

Why openaivec?

  • Drop-in .ai and .aio accessors keep pandas analysts in familiar tooling.
  • OpenAI batch-optimized: BatchCache/AsyncBatchCache coalesce requests, dedupe prompts, preserve order, and release waiters on failure.
  • Reasoning support mirrors the OpenAI SDK; structured outputs accept Pydantic response_format.
  • Built-in caches and retries remove boilerplate; pandas and async helpers can share caches explicitly, while Spark UDFs dedupe repeated inputs within each partition.
  • Spark UDFs, DuckDB integration, and Microsoft Fabric guides move notebooks into production-scale ETL.
  • Prompt tooling (FewShotPromptBuilder, improve) and the task library ship curated prompts with validated outputs.

Overview

Vectorized OpenAI batch processing so you handle many inputs per call instead of one-by-one. Batching proxies dedupe inputs, enforce ordered outputs, and unblock waiters even on upstream errors. Shared-cache helpers reuse expensive prompts across pandas and async flows, while Spark UDF builders dedupe repeated inputs within each partition. Reasoning models honor SDK semantics. Requires Python 3.10+.

Core Workflows

Direct API usage

For maximum control over batch processing:

import os
from openai import OpenAI
from openaivec import BatchResponses

# Initialize the batch client
client = BatchResponses.of(
    client=OpenAI(),
    model_name="gpt-5.1",
    system_message="Please answer only with 'xx family' and do not output anything else.",
    # batch_size defaults to None (automatic optimization)
)

result = client.parse(
    ["panda", "rabbit", "koala"],
    reasoning={"effort": "none"},
)
print(result)  # Expected output: ['bear family', 'rabbit family', 'koala family']

πŸ““ Complete tutorial β†’

pandas authentication options

Configure authentication once before using .ai or .aio.

OpenAI (API key)

import os

os.environ["OPENAI_API_KEY"] = "your-openai-api-key"

Azure OpenAI (API key)

import os

os.environ["AZURE_OPENAI_API_KEY"] = "your-azure-openai-api-key"
os.environ["AZURE_OPENAI_BASE_URL"] = "https://YOUR-RESOURCE-NAME.services.ai.azure.com/openai/v1/"
os.environ["AZURE_OPENAI_API_VERSION"] = "v1"

Use AZURE_OPENAI_API_VERSION="v1" together with the /openai/v1/ base URL.

Azure OpenAI with Entra ID (no API key)

import os

os.environ["AZURE_OPENAI_BASE_URL"] = "https://YOUR-RESOURCE-NAME.services.ai.azure.com/openai/v1/"
os.environ["AZURE_OPENAI_API_VERSION"] = "v1"
os.environ.pop("AZURE_OPENAI_API_KEY", None)

openaivec uses DefaultAzureCredential when AZURE_OPENAI_API_KEY is not set.

Custom clients (optional)

import openaivec
from openai import AsyncOpenAI, OpenAI
from openaivec import pandas_ext

openaivec.set_client(OpenAI())
openaivec.set_async_client(AsyncOpenAI())

pandas integration (recommended)

The easiest way to get started with your DataFrames (after authentication):

import openaivec
import pandas as pd
from openaivec import pandas_ext

openaivec.set_responses_model("gpt-5.1")

df = pd.DataFrame({"name": ["panda", "rabbit", "koala"]})

result = df.assign(
    family=lambda df: df.name.ai.responses(
        "What animal family? Answer with 'X family'",
        reasoning={"effort": "none"},
    )
)
name family
panda bear family
rabbit rabbit family
koala marsupial family

πŸ““ Interactive pandas examples β†’

Using with reasoning models

Reasoning models (o1-preview, o1-mini, o3-mini, etc.) follow OpenAI SDK semantics. Pass reasoning when you want to override model defaults.

import openaivec

openaivec.set_responses_model("o1-mini")  # Set your reasoning model

result = df.assign(
    analysis=lambda df: df.text.ai.responses(
        "Analyze this text step by step",
        reasoning={"effort": "none"},  # Optional: mirrors the OpenAI SDK argument
    )
)

You can omit reasoning to use the model defaults or tune it per request with the same shape (dict with effort) as the OpenAI SDK.

Using pre-configured tasks

For common text processing operations, openaivec provides ready-to-use tasks that eliminate the need to write custom prompts:

from openaivec.task import nlp, customer_support

text_df = pd.DataFrame({
    "text": [
        "Great product, fast delivery!",
        "Need help with billing issue",
        "How do I reset my password?"
    ]
})

results = text_df.assign(
    sentiment=lambda df: df.text.ai.task(
        nlp.sentiment_analysis(),
        reasoning={"effort": "none"},
    ),
    intent=lambda df: df.text.ai.task(
        customer_support.intent_analysis(),
        reasoning={"effort": "none"},
    ),
)

# Extract structured results into separate columns
extracted_results = results.ai.extract("sentiment")

Asynchronous processing with .aio

High-throughput workloads use the .aio accessor for async versions of all operations:

import asyncio
import openaivec
import pandas as pd
from openaivec import pandas_ext

openaivec.set_responses_model("gpt-5.1")

df = pd.DataFrame({"text": [
    "This product is amazing!",
    "Terrible customer service",
    "Good value for money",
    "Not what I expected"
] * 250})  # 1000 rows for demonstration

async def process_data():
    return await df["text"].aio.responses(
        "Analyze sentiment and classify as positive/negative/neutral",
        reasoning={"effort": "none"},  # Recommended for reasoning models
        max_concurrency=12    # Allow up to 12 concurrent requests
    )

sentiments = asyncio.run(process_data())

Performance benefits: Parallel processing with automatic batching/deduplication, built-in rate limiting and error handling, and memory-efficient streaming for large datasets.

Using with Apache Spark UDFs

Scale to enterprise datasets with distributed processing.

πŸ““ Spark tutorial β†’

Spark authentication options

Choose one setup path before registering UDFs.

OpenAI (API key)

from pyspark.sql import SparkSession
from openaivec.spark_ext import setup

spark = SparkSession.builder.getOrCreate()
setup(
    spark,
    api_key="your-openai-api-key",
    responses_model_name="gpt-5.1",
    embeddings_model_name="text-embedding-3-small",
)

Azure OpenAI (API key)

from pyspark.sql import SparkSession
from openaivec.spark_ext import setup_azure

spark = SparkSession.builder.getOrCreate()
setup_azure(
    spark,
    api_key="your-azure-openai-api-key",
    base_url="https://YOUR-RESOURCE-NAME.services.ai.azure.com/openai/v1/",
    api_version="v1",
    responses_model_name="my-gpt-deployment",
    embeddings_model_name="my-embedding-deployment",
)

Use api_version="v1" with a base URL that ends in /openai/v1/.

Azure OpenAI with Entra ID (no API key)

import os

os.environ["AZURE_OPENAI_BASE_URL"] = "https://YOUR-RESOURCE-NAME.services.ai.azure.com/openai/v1/"
os.environ["AZURE_OPENAI_API_VERSION"] = "v1"
os.environ.pop("AZURE_OPENAI_API_KEY", None)

openaivec uses DefaultAzureCredential when AZURE_OPENAI_API_KEY is not set.

Create and register UDFs using the provided helpers:

from openaivec.spark_ext import responses_udf

spark.udf.register(
    "extract_brand",
    responses_udf(
        instructions="Extract the brand name from the product. Return only the brand name.",
        reasoning={"effort": "none"},
    )
)

products = spark.createDataFrame(
    [("Nike Air Max",), ("Apple iPhone 15",)],
    ["product_name"],
)
products.selectExpr("product_name", "extract_brand(product_name) AS brand").show()

Other helper UDFs are available: task_udf, embeddings_udf, count_tokens_udf, similarity_udf, and parse_udf.

Spark performance tips

  • Duplicate detection automatically caches repeated inputs per partition for UDFs.
  • batch_size=None auto-optimizes; set 32–128 for fixed sizes if needed.
  • max_concurrency is per executor; total concurrency = executors Γ— max_concurrency. Start with 4–12.
  • Monitor rate limits and adjust concurrency to your OpenAI tier.

Using with DuckDB

Register AI-powered functions as DuckDB UDFs and use them in pure SQL. Structured outputs are returned as native STRUCT types with direct field access.

import openaivec
import duckdb
from pydantic import BaseModel
from typing import Literal
from openaivec import duckdb_ext

openaivec.set_responses_model("gpt-5.4")


class Sentiment(BaseModel):
    label: Literal["positive", "negative", "neutral"]
    confidence: float
    summary: str


conn = duckdb.connect()

duckdb_ext.responses_udf(
    conn,
    "analyze_sentiment",
    instructions="Analyze customer sentiment. Return label, confidence (0-1), and a one-sentence summary.",
    response_format=Sentiment,
)

# Query CSV directly β€” structured fields, no JSON parsing
conn.sql("""
    SELECT
        customer,
        analyze_sentiment(response).label      AS sentiment,
        analyze_sentiment(response).confidence AS confidence,
        analyze_sentiment(response).summary    AS summary
    FROM 'survey.csv'
""")

# Aggregate with standard SQL
conn.sql("""
    WITH results AS (
        SELECT analyze_sentiment(response).label AS sentiment
        FROM 'survey.csv'
    )
    SELECT sentiment, COUNT(*) AS count
    FROM results
    GROUP BY sentiment
""")

Embedding UDFs work the same way:

duckdb_ext.embeddings_udf(conn, "embed")

conn.sql("""
    SELECT text, list_cosine_similarity(embed(a.text), embed(b.text)) AS similarity
    FROM docs a, queries b
""")

All UDFs use Arrow vectorized execution β€” DuckDB sends batches of rows that are processed with async concurrency and automatic deduplication.

πŸ““ DuckDB tutorial β†’

Building Prompts

Few-shot prompts improve LLM quality. FewShotPromptBuilder structures purpose, cautions, and examples; improve() iterates with OpenAI to remove contradictions.

from openaivec import FewShotPromptBuilder

prompt = (
    FewShotPromptBuilder()
    .purpose("Return the smallest category that includes the given word")
    .caution("Never use proper nouns as categories")
    .example("Apple", "Fruit")
    .example("Car", "Vehicle")
    .improve(max_iter=1)  # optional
    .build()
)

πŸ““ Advanced prompting techniques β†’

Using with Microsoft Fabric

Microsoft Fabric is a unified, cloud-based analytics platform. Add openaivec from PyPI in your Fabric environment, select it in your notebook, and use openaivec.spark_ext like standard Spark.

Contributing

We welcome contributions! Please:

  1. Fork and branch from main.
  2. Add or update tests when you change code.
  3. Run formatting and tests before opening a PR.

Install dev deps:

uv sync --all-extras --dev

Lint and format:

uv run ruff check . --fix

Quick test pass:

uv run pytest -m "not slow and not requires_api"

Additional Resources

πŸ““ Customer feedback analysis β†’ - Sentiment analysis & prioritization
πŸ““ Survey data transformation β†’ - Unstructured to structured data
πŸ““ Asynchronous processing examples β†’ - High-performance async workflows
πŸ““ Auto-generate FAQs from documents β†’ - Create FAQs using AI
πŸ““ All examples β†’ - Complete collection of tutorials and use cases

Community

Join our Discord community for support and announcements: https://discord.gg/hXCS9J6Qek

About

openaivec

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages