Skip to content

API Reference

Bases: genkit.ai._base.GenkitBase

Genkit asyncio user-facing API.

Source code in packages/genkit/src/genkit/ai/_aio.py
class Genkit(GenkitBase):
    """Genkit asyncio user-facing API."""

    def __init__(
        self,
        plugins: list[Plugin] | None = None,
        model: str | None = None,
        reflection_server_spec: ServerSpec | None = None,
    ) -> None:
        """Initialize a new Genkit instance.

        Args:
            plugins: List of plugins to initialize.
            model: Model name to use.
            reflection_server_spec: Server spec for the reflection
                server.
        """
        super().__init__(plugins=plugins, model=model, reflection_server_spec=reflection_server_spec)

    async def generate(
        self,
        model: str | None = None,
        prompt: str | Part | list[Part] | None = None,
        system: str | Part | list[Part] | None = None,
        messages: list[Message] | None = None,
        tools: list[str] | None = None,
        return_tool_requests: bool | None = None,
        tool_choice: ToolChoice = None,
        tool_responses: list[Part] | None = None,
        config: GenerationCommonConfig | dict[str, Any] | None = None,
        max_turns: int | None = None,
        on_chunk: ModelStreamingCallback | None = None,
        context: dict[str, Any] | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: bool | str | None = None,
        output_schema: type | dict[str, Any] | None = None,
        output_constrained: bool | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[DocumentData] | None = None,
    ) -> GenerateResponseWrapper:
        """Generates text or structured data using a language model.

        This function provides a flexible interface for interacting with various
        language models, supporting both simple text generation and more complex
        interactions involving tools and structured conversations.

        Args:
            model: Optional. The name of the model to use for generation. If not
                provided, a default model may be used.
            prompt: Optional. A single prompt string, a `Part` object, or a list
                of `Part` objects to provide as input to the model. This is used
                for simple text generation.
            system: Optional. A system message string, a `Part` object, or a
                list of `Part` objects to provide context or instructions to
                the model, especially for chat-based models.
            messages: Optional. A list of `Message` objects representing a
                conversation history.  This is used for chat-based models to
                maintain context.
            tools: Optional. A list of tool names (strings) that the model can
                use.
            return_tool_requests: Optional. If `True`, the model will return
                tool requests instead of executing them directly.
            tool_choice: Optional. A `ToolChoice` object specifying how the
                model should choose which tool to use.
            tool_responses: Optional. tool_responses should contain a list of
                tool response parts corresponding to interrupt tool request
                parts from the most recent model message. Each entry must have
                a matching `name` and `ref` (if supplied) for its tool request
                counterpart.
            config: Optional. A `GenerationCommonConfig` object or a dictionary
                containing configuration parameters for the generation process.
                This allows fine-tuning the model's behavior.
            max_turns: Optional. The maximum number of turns in a conversation.
            on_chunk: Optional. A callback function of type
                `ModelStreamingCallback` that is called for each chunk of
                generated text during streaming.
            context: Optional. A dictionary containing additional context
                information that can be used during generation.
            output_format: Optional. The format to use for the output (e.g.,
                'json').
            output_content_type: Optional. The content type of the output.
            output_instructions: Optional. Instructions for formatting the
                output.
            output_schema: Optional. Schema defining the structure of the
                output.
            output_constrained: Optional. Whether to constrain the output to the
                schema.
            use: Optional. A list of `ModelMiddleware` functions to apply to the
                generation process. Middleware can be used to intercept and
                modify requests and responses.
            docs: Optional. A list of documents to be used for grounding.


        Returns:
            A `GenerateResponseWrapper` object containing the model's response,
            which may include generated text, tool requests, or other relevant
            information.

        Note:
            - The `tools`, `return_tool_requests`, and `tool_choice` arguments
              are used for models that support tool usage.
            - The `on_chunk` argument enables streaming responses, allowing you
              to process the generated content as it becomes available.
        """
        return await generate_action(
            self.registry,
            to_generate_action_options(
                registry=self.registry,
                model=model,
                prompt=prompt,
                system=system,
                messages=messages,
                tools=tools,
                return_tool_requests=return_tool_requests,
                tool_choice=tool_choice,
                tool_responses=tool_responses,
                config=config,
                max_turns=max_turns,
                output_format=output_format,
                output_content_type=output_content_type,
                output_instructions=output_instructions,
                output_schema=output_schema,
                output_constrained=output_constrained,
                docs=docs,
            ),
            on_chunk=on_chunk,
            middleware=use,
            context=context if context else ActionRunContext._current_context(),
        )

    def generate_stream(
        self,
        model: str | None = None,
        prompt: str | Part | list[Part] | None = None,
        system: str | Part | list[Part] | None = None,
        messages: list[Message] | None = None,
        tools: list[str] | None = None,
        return_tool_requests: bool | None = None,
        tool_choice: ToolChoice = None,
        config: GenerationCommonConfig | dict[str, Any] | None = None,
        max_turns: int | None = None,
        context: dict[str, Any] | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: bool | str | None = None,
        output_schema: type | dict[str, Any] | None = None,
        output_constrained: bool | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[DocumentData] | None = None,
        timeout: float | None = None,
    ) -> tuple[
        AsyncIterator[GenerateResponseChunkWrapper],
        Future[GenerateResponseWrapper],
    ]:
        """Streams generated text or structured data using a language model.

        This function provides a flexible interface for interacting with various
        language models, supporting both simple text generation and more complex
        interactions involving tools and structured conversations.

        Args:
            model: Optional. The name of the model to use for generation. If not
                provided, a default model may be used.
            prompt: Optional. A single prompt string, a `Part` object, or a list
                of `Part` objects to provide as input to the model. This is used
                for simple text generation.
            system: Optional. A system message string, a `Part` object, or a
                list of `Part` objects to provide context or instructions to the
                model, especially for chat-based models.
            messages: Optional. A list of `Message` objects representing a
                conversation history.  This is used for chat-based models to
                maintain context.
            tools: Optional. A list of tool names (strings) that the model can
                use.
            return_tool_requests: Optional. If `True`, the model will return
                tool requests instead of executing them directly.
            tool_choice: Optional. A `ToolChoice` object specifying how the
                model should choose which tool to use.
            config: Optional. A `GenerationCommonConfig` object or a dictionary
                containing configuration parameters for the generation process.
                This allows fine-tuning the model's behavior.
            max_turns: Optional. The maximum number of turns in a conversation.
            context: Optional. A dictionary containing additional context
                information that can be used during generation.
            output_format: Optional. The format to use for the output (e.g.,
                'json').
            output_content_type: Optional. The content type of the output.
            output_instructions: Optional. Instructions for formatting the
                output.
            output_schema: Optional. Schema defining the structure of the
                output.
            output_constrained: Optional. Whether to constrain the output to the
                schema.
            use: Optional. A list of `ModelMiddleware` functions to apply to the
                generation process. Middleware can be used to intercept and
                modify requests and responses.
            docs: Optional. A list of documents to be used for grounding.
            timeout: Optional. The timeout for the streaming action.

        Returns:
            A `GenerateResponseWrapper` object containing the model's response,
            which may include generated text, tool requests, or other relevant
            information.

        Note:
            - The `tools`, `return_tool_requests`, and `tool_choice` arguments
              are used for models that support tool usage.
            - The `on_chunk` argument enables streaming responses, allowing you
              to process the generated content as it becomes available.
        """
        stream = Channel(timeout=timeout)

        resp = self.generate(
            model=model,
            prompt=prompt,
            system=system,
            messages=messages,
            tools=tools,
            return_tool_requests=return_tool_requests,
            tool_choice=tool_choice,
            config=config,
            max_turns=max_turns,
            context=context,
            output_format=output_format,
            output_content_type=output_content_type,
            output_instructions=output_instructions,
            output_schema=output_schema,
            output_constrained=output_constrained,
            docs=docs,
            use=use,
            on_chunk=lambda c: stream.send(c),
        )
        stream.set_close_future(resp)

        return (stream, stream.closed)

    async def embed(
        self,
        embedder: str | None = None,
        documents: list[Document] | None = None,
        options: dict[str, Any] | None = None,
    ) -> EmbedResponse:
        """Calculates embeddings for documents.

        Args:
            embedder: Optional embedder model name to use.
            documents: Texts to embed.
            options: embedding options

        Returns:
            The generated response with embeddings.
        """
        embed_action = self.registry.lookup_action(ActionKind.EMBEDDER, embedder)

        return (await embed_action.arun(EmbedRequest(input=documents, options=options))).response

    async def retrieve(
        self,
        retriever: str | None = None,
        query: str | DocumentData | None = None,
        options: dict[str, Any] | None = None,
    ) -> RetrieverResponse:
        """Retrieves documents based on query.

        Args:
            retriever: Optional retriever name to use.
            query: Text query or a DocumentData containing query text.
            options: retriever options

        Returns:
            The generated response with embeddings.
        """
        if isinstance(query, str):
            query = Document.from_text(query)

        retrieve_action = self.registry.lookup_action(ActionKind.RETRIEVER, retriever)

        return (await retrieve_action.arun(RetrieverRequest(query=query, options=options))).response

__init__(plugins=None, model=None, reflection_server_spec=None)

Initialize a new Genkit instance.

Parameters:

Name Type Description Default
plugins list[genkit.ai._plugin.Plugin] | None

List of plugins to initialize.

None
model str | None

Model name to use.

None
reflection_server_spec genkit.ai._server.ServerSpec | None

Server spec for the reflection server.

None
Source code in packages/genkit/src/genkit/ai/_aio.py
def __init__(
    self,
    plugins: list[Plugin] | None = None,
    model: str | None = None,
    reflection_server_spec: ServerSpec | None = None,
) -> None:
    """Initialize a new Genkit instance.

    Args:
        plugins: List of plugins to initialize.
        model: Model name to use.
        reflection_server_spec: Server spec for the reflection
            server.
    """
    super().__init__(plugins=plugins, model=model, reflection_server_spec=reflection_server_spec)

define_batch_evaluator(name, display_name, definition, fn, is_billed=False, config_schema=None, metadata=None, description=None)

Define a batch evaluator action.

This action runs the callback function on the entire dataset.

Parameters:

Name Type Description Default
name str

Name of the evaluator.

required
fn genkit.blocks.evaluator.BatchEvaluatorFn

Function implementing the evaluator behavior.

required
display_name str

User-visible display name

required
definition str

User-visible evaluator definition

required
is_billed bool

Whether the evaluator performs any billed actions (paid APIs, LLMs etc.)

False
config_schema pydantic.BaseModel | dict[str, typing.Any] | None

Optional schema for evaluator configuration.

None
metadata dict[str, typing.Any] | None

Optional metadata for the evaluator.

None
description str | None

Optional description for the evaluator.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_batch_evaluator(
    self,
    name: str,
    display_name: str,
    definition: str,
    fn: BatchEvaluatorFn,
    is_billed: bool = False,
    config_schema: BaseModel | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    description: str | None = None,
) -> Callable[[Callable], Callable]:
    """Define a batch evaluator action.

    This action runs the callback function on the entire dataset.

    Args:
        name: Name of the evaluator.
        fn: Function implementing the evaluator behavior.
        display_name: User-visible display name
        definition: User-visible evaluator definition
        is_billed: Whether the evaluator performs any billed actions
                    (paid  APIs, LLMs etc.)
        config_schema: Optional schema for evaluator configuration.
        metadata: Optional metadata for the evaluator.
        description: Optional description for the evaluator.
    """
    evaluator_meta = metadata if metadata else {}
    if 'evaluator' not in evaluator_meta:
        evaluator_meta['evaluator'] = {}
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DEFINITION] = definition
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DISPLAY_NAME] = display_name
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_IS_BILLED] = is_billed
    if 'label' not in evaluator_meta['evaluator'] or not evaluator_meta['evaluator']['label']:
        evaluator_meta['evaluator']['label'] = name
    if config_schema:
        evaluator_meta['evaluator']['customOptions'] = to_json_schema(config_schema)

    evaluator_description = get_func_description(fn, description)
    return self.registry.register_action(
        name=name,
        kind=ActionKind.EVALUATOR,
        fn=fn,
        metadata=evaluator_meta,
        description=evaluator_description,
    )

define_embedder(name, fn, metadata=None, description=None)

Define a custom embedder action.

Parameters:

Name Type Description Default
name str

Name of the model.

required
fn genkit.blocks.embedding.EmbedderFn

Function implementing the embedder behavior.

required
metadata dict[str, typing.Any] | None

Optional metadata for the model.

None
description str | None

Optional description for the embedder.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_embedder(
    self,
    name: str,
    fn: EmbedderFn,
    metadata: dict[str, Any] | None = None,
    description: str | None = None,
) -> Action:
    """Define a custom embedder action.

    Args:
        name: Name of the model.
        fn: Function implementing the embedder behavior.
        metadata: Optional metadata for the model.
        description: Optional description for the embedder.
    """
    embedder_description = get_func_description(fn, description)
    return self.registry.register_action(
        name=name,
        kind=ActionKind.EMBEDDER,
        fn=fn,
        metadata=metadata,
        description=embedder_description,
    )

define_evaluator(name, display_name, definition, fn, is_billed=False, config_schema=None, metadata=None, description=None)

Define a evaluator action.

This action runs the callback function on the every sample of the input dataset.

Parameters:

Name Type Description Default
name str

Name of the evaluator.

required
fn genkit.blocks.evaluator.EvaluatorFn

Function implementing the evaluator behavior.

required
display_name str

User-visible display name

required
definition str

User-visible evaluator definition

required
is_billed bool

Whether the evaluator performs any billed actions (paid APIs, LLMs etc.)

False
config_schema pydantic.BaseModel | dict[str, typing.Any] | None

Optional schema for evaluator configuration.

None
metadata dict[str, typing.Any] | None

Optional metadata for the evaluator.

None
description str | None

Optional description for the evaluator.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_evaluator(
    self,
    name: str,
    display_name: str,
    definition: str,
    fn: EvaluatorFn,
    is_billed: bool = False,
    config_schema: BaseModel | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    description: str | None = None,
) -> Callable[[Callable], Callable]:
    """Define a evaluator action.

    This action runs the callback function on the every sample of
    the input dataset.

    Args:
        name: Name of the evaluator.
        fn: Function implementing the evaluator behavior.
        display_name: User-visible display name
        definition: User-visible evaluator definition
        is_billed: Whether the evaluator performs any billed actions
                    (paid  APIs, LLMs etc.)
        config_schema: Optional schema for evaluator configuration.
        metadata: Optional metadata for the evaluator.
        description: Optional description for the evaluator.
    """
    evaluator_meta = metadata if metadata else {}
    if 'evaluator' not in evaluator_meta:
        evaluator_meta['evaluator'] = {}
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DEFINITION] = definition
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DISPLAY_NAME] = display_name
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_IS_BILLED] = is_billed
    if 'label' not in evaluator_meta['evaluator'] or not evaluator_meta['evaluator']['label']:
        evaluator_meta['evaluator']['label'] = name
    if config_schema:
        evaluator_meta['evaluator']['customOptions'] = to_json_schema(config_schema)

    evaluator_description = get_func_description(fn, description)

    def eval_stepper_fn(req: EvalRequest) -> EvalResponse:
        eval_responses: list[EvalFnResponse] = []
        for index in range(len(req.dataset)):
            datapoint = req.dataset[index]
            if datapoint.test_case_id is None:
                datapoint.test_case_id = str(uuid.uuid4())
            span_metadata = SpanMetadata(
                name=f'Test Case {datapoint.test_case_id}',
                metadata={'evaluator:evalRunId': req.eval_run_id},
            )
            try:
                with run_in_new_span(span_metadata, labels={'genkit:type': 'evaluator'}) as span:
                    span_id = span.span_id()
                    trace_id = span.trace_id()
                    try:
                        span.set_input(datapoint)
                        test_case_output = fn(datapoint, req.options)
                        test_case_output.span_id = span_id
                        test_case_output.trace_id = trace_id
                        span.set_output(test_case_output)
                        eval_responses.append(test_case_output)
                    except Exception as e:
                        logger.debug(f'eval_stepper_fn error: {str(e)}')
                        logger.debug(traceback.format_exc())
                        evaluation = Score(
                            error=f'Evaluation of test case {datapoint.test_case_id} failed: \n{str(e)}'
                        )
                        eval_responses.append(
                            EvalFnResponse(
                                span_id=span_id,
                                trace_id=trace_id,
                                test_case_id=datapoint.test_case_id,
                                evaluation=evaluation,
                            )
                        )
                        # Raise to mark span as failed
                        raise e
            except Exception:
                # Continue to process other points
                continue
        return EvalResponse(eval_responses)

    return self.registry.register_action(
        name=name,
        kind=ActionKind.EVALUATOR,
        fn=eval_stepper_fn,
        metadata=evaluator_meta,
        description=evaluator_description,
    )

define_format(format)

Registers a custom format in the registry.

Parameters:

Name Type Description Default
format genkit.blocks.formats.types.FormatDef

The format to register.

required
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_format(self, format: FormatDef) -> None:
    """Registers a custom format in the registry.

    Args:
        format: The format to register.
    """
    self.registry.register_value('format', format.name, format)

define_model(name, fn, config_schema=None, metadata=None, info=None, description=None)

Define a custom model action.

Parameters:

Name Type Description Default
name str

Name of the model.

required
fn genkit.blocks.model.ModelFn

Function implementing the model behavior.

required
config_schema pydantic.BaseModel | dict[str, typing.Any] | None

Optional schema for model configuration.

None
metadata dict[str, typing.Any] | None

Optional metadata for the model.

None
info genkit.core.typing.ModelInfo | None

Optional ModelInfo for the model.

None
description str | None

Optional description for the model.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_model(
    self,
    name: str,
    fn: ModelFn,
    config_schema: BaseModel | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    info: ModelInfo | None = None,
    description: str | None = None,
) -> Action:
    """Define a custom model action.

    Args:
        name: Name of the model.
        fn: Function implementing the model behavior.
        config_schema: Optional schema for model configuration.
        metadata: Optional metadata for the model.
        info: Optional ModelInfo for the model.
        description: Optional description for the model.
    """
    model_meta: dict[str, Any] = metadata if metadata else {}
    if info:
        model_meta['model'] = dump_dict(info)
    if 'model' not in model_meta:
        model_meta['model'] = {}
    if 'label' not in model_meta['model'] or not model_meta['model']['label']:
        model_meta['model']['label'] = name

    if config_schema:
        model_meta['model']['customOptions'] = to_json_schema(config_schema)

    model_description = get_func_description(fn, description)
    return self.registry.register_action(
        name=name,
        kind=ActionKind.MODEL,
        fn=fn,
        metadata=model_meta,
        description=model_description,
    )

define_prompt(variant=None, model=None, config=None, description=None, input_schema=None, system=None, prompt=None, messages=None, output_format=None, output_content_type=None, output_instructions=None, output_schema=None, output_constrained=None, max_turns=None, return_tool_requests=None, metadata=None, tools=None, tool_choice=None, use=None)

Define a prompt.

Parameters:

Name Type Description Default
variant str | None

Optional variant name for the prompt.

None
model str | None

Optional model name to use for the prompt.

None
config genkit.core.typing.GenerationCommonConfig | dict[str, typing.Any] | None

Optional configuration for the model.

None
description str | None

Optional description for the prompt.

None
input_schema type | dict[str, typing.Any] | None

Optional schema for the input to the prompt.

None
system str | genkit.core.typing.Part | list[genkit.core.typing.Part] | None

Optional system message for the prompt.

None
prompt str | genkit.core.typing.Part | list[genkit.core.typing.Part] | None

Optional prompt for the model.

None
messages str | list[genkit.core.typing.Message] | None

Optional messages for the model.

None
output_format str | None

Optional output format for the prompt.

None
output_content_type str | None

Optional output content type for the prompt.

None
output_instructions bool | str | None

Optional output instructions for the prompt.

None
output_schema type | dict[str, typing.Any] | None

Optional schema for the output from the prompt.

None
output_constrained bool | None

Optional flag indicating whether the output should be constrained.

None
max_turns int | None

Optional maximum number of turns for the prompt.

None
return_tool_requests bool | None

Optional flag indicating whether tool requests should be returned.

None
metadata dict[str, typing.Any] | None

Optional metadata for the prompt.

None
tools list[str] | None

Optional list of tools to use for the prompt.

None
tool_choice genkit.core.typing.ToolChoice | None

Optional tool choice for the prompt.

None
use list[genkit.blocks.model.ModelMiddleware] | None

Optional list of model middlewares to use for the prompt.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_prompt(
    self,
    variant: str | None = None,
    model: str | None = None,
    config: GenerationCommonConfig | dict[str, Any] | None = None,
    description: str | None = None,
    input_schema: type | dict[str, Any] | None = None,
    system: str | Part | list[Part] | None = None,
    prompt: str | Part | list[Part] | None = None,
    messages: str | list[Message] | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: bool | str | None = None,
    output_schema: type | dict[str, Any] | None = None,
    output_constrained: bool | None = None,
    max_turns: int | None = None,
    return_tool_requests: bool | None = None,
    metadata: dict[str, Any] | None = None,
    tools: list[str] | None = None,
    tool_choice: ToolChoice | None = None,
    use: list[ModelMiddleware] | None = None,
    # TODO:
    #  docs: list[Document]
):
    """Define a prompt.

    Args:
        variant: Optional variant name for the prompt.
        model: Optional model name to use for the prompt.
        config: Optional configuration for the model.
        description: Optional description for the prompt.
        input_schema: Optional schema for the input to the prompt.
        system: Optional system message for the prompt.
        prompt: Optional prompt for the model.
        messages: Optional messages for the model.
        output_format: Optional output format for the prompt.
        output_content_type: Optional output content type for the prompt.
        output_instructions: Optional output instructions for the prompt.
        output_schema: Optional schema for the output from the prompt.
        output_constrained: Optional flag indicating whether the output
            should be constrained.
        max_turns: Optional maximum number of turns for the prompt.
        return_tool_requests: Optional flag indicating whether tool requests
            should be returned.
        metadata: Optional metadata for the prompt.
        tools: Optional list of tools to use for the prompt.
        tool_choice: Optional tool choice for the prompt.
        use: Optional list of model middlewares to use for the prompt.
    """
    return define_prompt(
        self.registry,
        variant=variant,
        model=model,
        config=config,
        description=description,
        input_schema=input_schema,
        system=system,
        prompt=prompt,
        messages=messages,
        output_format=output_format,
        output_content_type=output_content_type,
        output_instructions=output_instructions,
        output_schema=output_schema,
        output_constrained=output_constrained,
        max_turns=max_turns,
        return_tool_requests=return_tool_requests,
        metadata=metadata,
        tools=tools,
        tool_choice=tool_choice,
        use=use,
    )

define_retriever(name, fn, config_schema=None, metadata=None, description=None)

Define a retriever action.

Parameters:

Name Type Description Default
name str

Name of the retriever.

required
fn genkit.blocks.retriever.RetrieverFn

Function implementing the retriever behavior.

required
config_schema pydantic.BaseModel | dict[str, typing.Any] | None

Optional schema for retriever configuration.

None
metadata dict[str, typing.Any] | None

Optional metadata for the retriever.

None
description str | None

Optional description for the retriever.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_retriever(
    self,
    name: str,
    fn: RetrieverFn,
    config_schema: BaseModel | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    description: str | None = None,
) -> Callable[[Callable], Callable]:
    """Define a retriever action.

    Args:
        name: Name of the retriever.
        fn: Function implementing the retriever behavior.
        config_schema: Optional schema for retriever configuration.
        metadata: Optional metadata for the retriever.
        description: Optional description for the retriever.
    """
    retriever_meta = metadata if metadata else {}
    if 'retriever' not in retriever_meta:
        retriever_meta['retriever'] = {}
    if 'label' not in retriever_meta['retriever'] or not retriever_meta['retriever']['label']:
        retriever_meta['retriever']['label'] = name
    if config_schema:
        retriever_meta['retriever']['customOptions'] = to_json_schema(config_schema)

    retriever_description = get_func_description(fn, description)
    return self.registry.register_action(
        name=name,
        kind=ActionKind.RETRIEVER,
        fn=fn,
        metadata=retriever_meta,
        description=retriever_description,
    )

embed(embedder=None, documents=None, options=None) async

Calculates embeddings for documents.

Parameters:

Name Type Description Default
embedder str | None

Optional embedder model name to use.

None
documents list[genkit.blocks.document.Document] | None

Texts to embed.

None
options dict[str, typing.Any] | None

embedding options

None

Returns:

Type Description
genkit.blocks.embedding.EmbedResponse

The generated response with embeddings.

Source code in packages/genkit/src/genkit/ai/_aio.py
async def embed(
    self,
    embedder: str | None = None,
    documents: list[Document] | None = None,
    options: dict[str, Any] | None = None,
) -> EmbedResponse:
    """Calculates embeddings for documents.

    Args:
        embedder: Optional embedder model name to use.
        documents: Texts to embed.
        options: embedding options

    Returns:
        The generated response with embeddings.
    """
    embed_action = self.registry.lookup_action(ActionKind.EMBEDDER, embedder)

    return (await embed_action.arun(EmbedRequest(input=documents, options=options))).response

flow(name=None, description=None)

Decorator to register a function as a flow.

Parameters:

Name Type Description Default
name str | None

Optional name for the flow. If not provided, uses the function name.

None
description str | None

Optional description for the flow. If not provided, uses the function docstring.

None

Returns:

Type Description
collections.abc.Callable[[collections.abc.Callable], collections.abc.Callable]

A decorator function that registers the flow.

Source code in packages/genkit/src/genkit/ai/_registry.py
def flow(self, name: str | None = None, description: str | None = None) -> Callable[[Callable], Callable]:
    """Decorator to register a function as a flow.

    Args:
        name: Optional name for the flow. If not provided, uses the
            function name.
        description: Optional description for the flow. If not provided,
            uses the function docstring.

    Returns:
        A decorator function that registers the flow.
    """

    def wrapper(func: Callable) -> Callable:
        """Register the decorated function as a flow.

        Args:
            func: The function to register as a flow.

        Returns:
            The wrapped function that executes the flow.
        """
        flow_name = name if name is not None else func.__name__
        flow_description = get_func_description(func, description)
        action = self.registry.register_action(
            name=flow_name,
            kind=ActionKind.FLOW,
            fn=func,
            description=flow_description,
            span_metadata={'genkit:metadata:flow:name': flow_name},
        )

        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            """Asynchronous wrapper for the flow function.

            Args:
                *args: Positional arguments to pass to the flow function.
                **kwargs: Keyword arguments to pass to the flow function.

            Returns:
                The response from the flow function.
            """
            return (await action.arun(*args, **kwargs)).response

        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            """Synchronous wrapper for the flow function.

            Args:
                *args: Positional arguments to pass to the flow function.
                **kwargs: Keyword arguments to pass to the flow function.

            Returns:
                The response from the flow function.
            """
            return action.run(*args, **kwargs).response

        return FlowWrapper(
            fn=async_wrapper if action.is_async else sync_wrapper,
            action=action,
        )

    return wrapper

generate(model=None, prompt=None, system=None, messages=None, tools=None, return_tool_requests=None, tool_choice=None, tool_responses=None, config=None, max_turns=None, on_chunk=None, context=None, output_format=None, output_content_type=None, output_instructions=None, output_schema=None, output_constrained=None, use=None, docs=None) async

Generates text or structured data using a language model.

This function provides a flexible interface for interacting with various language models, supporting both simple text generation and more complex interactions involving tools and structured conversations.

Parameters:

Name Type Description Default
model str | None

Optional. The name of the model to use for generation. If not provided, a default model may be used.

None
prompt str | genkit.types.Part | list[genkit.types.Part] | None

Optional. A single prompt string, a Part object, or a list of Part objects to provide as input to the model. This is used for simple text generation.

None
system str | genkit.types.Part | list[genkit.types.Part] | None

Optional. A system message string, a Part object, or a list of Part objects to provide context or instructions to the model, especially for chat-based models.

None
messages list[genkit.types.Message] | None

Optional. A list of Message objects representing a conversation history. This is used for chat-based models to maintain context.

None
tools list[str] | None

Optional. A list of tool names (strings) that the model can use.

None
return_tool_requests bool | None

Optional. If True, the model will return tool requests instead of executing them directly.

None
tool_choice genkit.types.ToolChoice

Optional. A ToolChoice object specifying how the model should choose which tool to use.

None
tool_responses list[genkit.types.Part] | None

Optional. tool_responses should contain a list of tool response parts corresponding to interrupt tool request parts from the most recent model message. Each entry must have a matching name and ref (if supplied) for its tool request counterpart.

None
config genkit.types.GenerationCommonConfig | dict[str, typing.Any] | None

Optional. A GenerationCommonConfig object or a dictionary containing configuration parameters for the generation process. This allows fine-tuning the model's behavior.

None
max_turns int | None

Optional. The maximum number of turns in a conversation.

None
on_chunk genkit.blocks.generate.StreamingCallback | None

Optional. A callback function of type ModelStreamingCallback that is called for each chunk of generated text during streaming.

None
context dict[str, typing.Any] | None

Optional. A dictionary containing additional context information that can be used during generation.

None
output_format str | None

Optional. The format to use for the output (e.g., 'json').

None
output_content_type str | None

Optional. The content type of the output.

None
output_instructions bool | str | None

Optional. Instructions for formatting the output.

None
output_schema type | dict[str, typing.Any] | None

Optional. Schema defining the structure of the output.

None
output_constrained bool | None

Optional. Whether to constrain the output to the schema.

None
use list[genkit.blocks.model.ModelMiddleware] | None

Optional. A list of ModelMiddleware functions to apply to the generation process. Middleware can be used to intercept and modify requests and responses.

None
docs list[genkit.types.DocumentData] | None

Optional. A list of documents to be used for grounding.

None

Returns:

Type Description
genkit.blocks.model.GenerateResponseWrapper

A GenerateResponseWrapper object containing the model's response,

genkit.blocks.model.GenerateResponseWrapper

which may include generated text, tool requests, or other relevant

genkit.blocks.model.GenerateResponseWrapper

information.

Note
  • The tools, return_tool_requests, and tool_choice arguments are used for models that support tool usage.
  • The on_chunk argument enables streaming responses, allowing you to process the generated content as it becomes available.
Source code in packages/genkit/src/genkit/ai/_aio.py
async def generate(
    self,
    model: str | None = None,
    prompt: str | Part | list[Part] | None = None,
    system: str | Part | list[Part] | None = None,
    messages: list[Message] | None = None,
    tools: list[str] | None = None,
    return_tool_requests: bool | None = None,
    tool_choice: ToolChoice = None,
    tool_responses: list[Part] | None = None,
    config: GenerationCommonConfig | dict[str, Any] | None = None,
    max_turns: int | None = None,
    on_chunk: ModelStreamingCallback | None = None,
    context: dict[str, Any] | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: bool | str | None = None,
    output_schema: type | dict[str, Any] | None = None,
    output_constrained: bool | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[DocumentData] | None = None,
) -> GenerateResponseWrapper:
    """Generates text or structured data using a language model.

    This function provides a flexible interface for interacting with various
    language models, supporting both simple text generation and more complex
    interactions involving tools and structured conversations.

    Args:
        model: Optional. The name of the model to use for generation. If not
            provided, a default model may be used.
        prompt: Optional. A single prompt string, a `Part` object, or a list
            of `Part` objects to provide as input to the model. This is used
            for simple text generation.
        system: Optional. A system message string, a `Part` object, or a
            list of `Part` objects to provide context or instructions to
            the model, especially for chat-based models.
        messages: Optional. A list of `Message` objects representing a
            conversation history.  This is used for chat-based models to
            maintain context.
        tools: Optional. A list of tool names (strings) that the model can
            use.
        return_tool_requests: Optional. If `True`, the model will return
            tool requests instead of executing them directly.
        tool_choice: Optional. A `ToolChoice` object specifying how the
            model should choose which tool to use.
        tool_responses: Optional. tool_responses should contain a list of
            tool response parts corresponding to interrupt tool request
            parts from the most recent model message. Each entry must have
            a matching `name` and `ref` (if supplied) for its tool request
            counterpart.
        config: Optional. A `GenerationCommonConfig` object or a dictionary
            containing configuration parameters for the generation process.
            This allows fine-tuning the model's behavior.
        max_turns: Optional. The maximum number of turns in a conversation.
        on_chunk: Optional. A callback function of type
            `ModelStreamingCallback` that is called for each chunk of
            generated text during streaming.
        context: Optional. A dictionary containing additional context
            information that can be used during generation.
        output_format: Optional. The format to use for the output (e.g.,
            'json').
        output_content_type: Optional. The content type of the output.
        output_instructions: Optional. Instructions for formatting the
            output.
        output_schema: Optional. Schema defining the structure of the
            output.
        output_constrained: Optional. Whether to constrain the output to the
            schema.
        use: Optional. A list of `ModelMiddleware` functions to apply to the
            generation process. Middleware can be used to intercept and
            modify requests and responses.
        docs: Optional. A list of documents to be used for grounding.


    Returns:
        A `GenerateResponseWrapper` object containing the model's response,
        which may include generated text, tool requests, or other relevant
        information.

    Note:
        - The `tools`, `return_tool_requests`, and `tool_choice` arguments
          are used for models that support tool usage.
        - The `on_chunk` argument enables streaming responses, allowing you
          to process the generated content as it becomes available.
    """
    return await generate_action(
        self.registry,
        to_generate_action_options(
            registry=self.registry,
            model=model,
            prompt=prompt,
            system=system,
            messages=messages,
            tools=tools,
            return_tool_requests=return_tool_requests,
            tool_choice=tool_choice,
            tool_responses=tool_responses,
            config=config,
            max_turns=max_turns,
            output_format=output_format,
            output_content_type=output_content_type,
            output_instructions=output_instructions,
            output_schema=output_schema,
            output_constrained=output_constrained,
            docs=docs,
        ),
        on_chunk=on_chunk,
        middleware=use,
        context=context if context else ActionRunContext._current_context(),
    )

generate_stream(model=None, prompt=None, system=None, messages=None, tools=None, return_tool_requests=None, tool_choice=None, config=None, max_turns=None, context=None, output_format=None, output_content_type=None, output_instructions=None, output_schema=None, output_constrained=None, use=None, docs=None, timeout=None)

Streams generated text or structured data using a language model.

This function provides a flexible interface for interacting with various language models, supporting both simple text generation and more complex interactions involving tools and structured conversations.

Parameters:

Name Type Description Default
model str | None

Optional. The name of the model to use for generation. If not provided, a default model may be used.

None
prompt str | genkit.types.Part | list[genkit.types.Part] | None

Optional. A single prompt string, a Part object, or a list of Part objects to provide as input to the model. This is used for simple text generation.

None
system str | genkit.types.Part | list[genkit.types.Part] | None

Optional. A system message string, a Part object, or a list of Part objects to provide context or instructions to the model, especially for chat-based models.

None
messages list[genkit.types.Message] | None

Optional. A list of Message objects representing a conversation history. This is used for chat-based models to maintain context.

None
tools list[str] | None

Optional. A list of tool names (strings) that the model can use.

None
return_tool_requests bool | None

Optional. If True, the model will return tool requests instead of executing them directly.

None
tool_choice genkit.types.ToolChoice

Optional. A ToolChoice object specifying how the model should choose which tool to use.

None
config genkit.types.GenerationCommonConfig | dict[str, typing.Any] | None

Optional. A GenerationCommonConfig object or a dictionary containing configuration parameters for the generation process. This allows fine-tuning the model's behavior.

None
max_turns int | None

Optional. The maximum number of turns in a conversation.

None
context dict[str, typing.Any] | None

Optional. A dictionary containing additional context information that can be used during generation.

None
output_format str | None

Optional. The format to use for the output (e.g., 'json').

None
output_content_type str | None

Optional. The content type of the output.

None
output_instructions bool | str | None

Optional. Instructions for formatting the output.

None
output_schema type | dict[str, typing.Any] | None

Optional. Schema defining the structure of the output.

None
output_constrained bool | None

Optional. Whether to constrain the output to the schema.

None
use list[genkit.blocks.model.ModelMiddleware] | None

Optional. A list of ModelMiddleware functions to apply to the generation process. Middleware can be used to intercept and modify requests and responses.

None
docs list[genkit.types.DocumentData] | None

Optional. A list of documents to be used for grounding.

None
timeout float | None

Optional. The timeout for the streaming action.

None

Returns:

Type Description
collections.abc.AsyncIterator[genkit.blocks.model.GenerateResponseChunkWrapper]

A GenerateResponseWrapper object containing the model's response,

asyncio.Future[genkit.blocks.model.GenerateResponseWrapper]

which may include generated text, tool requests, or other relevant

tuple[collections.abc.AsyncIterator[genkit.blocks.model.GenerateResponseChunkWrapper], asyncio.Future[genkit.blocks.model.GenerateResponseWrapper]]

information.

Note
  • The tools, return_tool_requests, and tool_choice arguments are used for models that support tool usage.
  • The on_chunk argument enables streaming responses, allowing you to process the generated content as it becomes available.
Source code in packages/genkit/src/genkit/ai/_aio.py
def generate_stream(
    self,
    model: str | None = None,
    prompt: str | Part | list[Part] | None = None,
    system: str | Part | list[Part] | None = None,
    messages: list[Message] | None = None,
    tools: list[str] | None = None,
    return_tool_requests: bool | None = None,
    tool_choice: ToolChoice = None,
    config: GenerationCommonConfig | dict[str, Any] | None = None,
    max_turns: int | None = None,
    context: dict[str, Any] | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: bool | str | None = None,
    output_schema: type | dict[str, Any] | None = None,
    output_constrained: bool | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[DocumentData] | None = None,
    timeout: float | None = None,
) -> tuple[
    AsyncIterator[GenerateResponseChunkWrapper],
    Future[GenerateResponseWrapper],
]:
    """Streams generated text or structured data using a language model.

    This function provides a flexible interface for interacting with various
    language models, supporting both simple text generation and more complex
    interactions involving tools and structured conversations.

    Args:
        model: Optional. The name of the model to use for generation. If not
            provided, a default model may be used.
        prompt: Optional. A single prompt string, a `Part` object, or a list
            of `Part` objects to provide as input to the model. This is used
            for simple text generation.
        system: Optional. A system message string, a `Part` object, or a
            list of `Part` objects to provide context or instructions to the
            model, especially for chat-based models.
        messages: Optional. A list of `Message` objects representing a
            conversation history.  This is used for chat-based models to
            maintain context.
        tools: Optional. A list of tool names (strings) that the model can
            use.
        return_tool_requests: Optional. If `True`, the model will return
            tool requests instead of executing them directly.
        tool_choice: Optional. A `ToolChoice` object specifying how the
            model should choose which tool to use.
        config: Optional. A `GenerationCommonConfig` object or a dictionary
            containing configuration parameters for the generation process.
            This allows fine-tuning the model's behavior.
        max_turns: Optional. The maximum number of turns in a conversation.
        context: Optional. A dictionary containing additional context
            information that can be used during generation.
        output_format: Optional. The format to use for the output (e.g.,
            'json').
        output_content_type: Optional. The content type of the output.
        output_instructions: Optional. Instructions for formatting the
            output.
        output_schema: Optional. Schema defining the structure of the
            output.
        output_constrained: Optional. Whether to constrain the output to the
            schema.
        use: Optional. A list of `ModelMiddleware` functions to apply to the
            generation process. Middleware can be used to intercept and
            modify requests and responses.
        docs: Optional. A list of documents to be used for grounding.
        timeout: Optional. The timeout for the streaming action.

    Returns:
        A `GenerateResponseWrapper` object containing the model's response,
        which may include generated text, tool requests, or other relevant
        information.

    Note:
        - The `tools`, `return_tool_requests`, and `tool_choice` arguments
          are used for models that support tool usage.
        - The `on_chunk` argument enables streaming responses, allowing you
          to process the generated content as it becomes available.
    """
    stream = Channel(timeout=timeout)

    resp = self.generate(
        model=model,
        prompt=prompt,
        system=system,
        messages=messages,
        tools=tools,
        return_tool_requests=return_tool_requests,
        tool_choice=tool_choice,
        config=config,
        max_turns=max_turns,
        context=context,
        output_format=output_format,
        output_content_type=output_content_type,
        output_instructions=output_instructions,
        output_schema=output_schema,
        output_constrained=output_constrained,
        docs=docs,
        use=use,
        on_chunk=lambda c: stream.send(c),
    )
    stream.set_close_future(resp)

    return (stream, stream.closed)

retrieve(retriever=None, query=None, options=None) async

Retrieves documents based on query.

Parameters:

Name Type Description Default
retriever str | None

Optional retriever name to use.

None
query str | genkit.types.DocumentData | None

Text query or a DocumentData containing query text.

None
options dict[str, typing.Any] | None

retriever options

None

Returns:

Type Description
genkit.types.RetrieverResponse

The generated response with embeddings.

Source code in packages/genkit/src/genkit/ai/_aio.py
async def retrieve(
    self,
    retriever: str | None = None,
    query: str | DocumentData | None = None,
    options: dict[str, Any] | None = None,
) -> RetrieverResponse:
    """Retrieves documents based on query.

    Args:
        retriever: Optional retriever name to use.
        query: Text query or a DocumentData containing query text.
        options: retriever options

    Returns:
        The generated response with embeddings.
    """
    if isinstance(query, str):
        query = Document.from_text(query)

    retrieve_action = self.registry.lookup_action(ActionKind.RETRIEVER, retriever)

    return (await retrieve_action.arun(RetrieverRequest(query=query, options=options))).response

run_main(coro=None)

Runs the provided coroutine on an event loop.

Parameters:

Name Type Description Default
coro collections.abc.Coroutine[typing.Any, typing.Any, typing.Any] | None

The coroutine to run.

None

Returns:

Type Description
typing.Any

The result of the coroutine.

Source code in packages/genkit/src/genkit/ai/_base.py
def run_main(self, coro: Coroutine[Any, Any, Any] | None = None) -> Any:
    """Runs the provided coroutine on an event loop.

    Args:
        coro: The coroutine to run.

    Returns:
        The result of the coroutine.
    """

    if not coro:

        async def blank_coro():
            pass

        coro = blank_coro()

    result = None
    if self._loop:

        async def run() -> Any:
            return await coro

        result = run_async(self._loop, run)
    else:
        result = asyncio.run(coro)
    self._join()
    return result

tool(name=None, description=None)

Decorator to register a function as a tool.

Parameters:

Name Type Description Default
name str | None

Optional name for the flow. If not provided, uses the function name.

None
description str | None

Description for the tool to be passed to the model; if not provided, uses the function docstring.

None

Returns:

Type Description
collections.abc.Callable[[collections.abc.Callable], collections.abc.Callable]

A decorator function that registers the tool.

Source code in packages/genkit/src/genkit/ai/_registry.py
def tool(self, name: str | None = None, description: str | None = None) -> Callable[[Callable], Callable]:
    """Decorator to register a function as a tool.

    Args:
        name: Optional name for the flow. If not provided, uses the function
            name.
        description: Description for the tool to be passed to the model;
            if not provided, uses the function docstring.

    Returns:
        A decorator function that registers the tool.
    """

    def wrapper(func: Callable) -> Callable:
        """Register the decorated function as a tool.

        Args:
            func: The function to register as a tool.

        Returns:
            The wrapped function that executes the tool.
        """
        tool_name = name if name is not None else func.__name__
        tool_description = get_func_description(func, description)

        input_spec = inspect.getfullargspec(func)

        def tool_fn_wrapper(*args):
            match len(input_spec.args):
                case 0:
                    return func()
                case 1:
                    return func(args[0])
                case 2:
                    return func(args[0], ToolRunContext(args[1]))
                case _:
                    raise ValueError('tool must have 0-2 args...')

        action = self.registry.register_action(
            name=tool_name,
            kind=ActionKind.TOOL,
            description=tool_description,
            fn=tool_fn_wrapper,
            metadata_fn=func,
        )

        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            """Asynchronous wrapper for the tool function.

            Args:
                *args: Positional arguments to pass to the tool function.
                **kwargs: Keyword arguments to pass to the tool function.

            Returns:
                The response from the tool function.
            """
            return (await action.arun(*args, **kwargs)).response

        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            """Synchronous wrapper for the tool function.

            Args:
                *args: Positional arguments to pass to the tool function.
                **kwargs: Keyword arguments to pass to the tool function.

            Returns:
                The response from the tool function.
            """
            return action.run(*args, **kwargs).response

        return async_wrapper if action.is_async else sync_wrapper

    return wrapper

User-facing API for interacting with Genkit registry.

Source code in packages/genkit/src/genkit/ai/_registry.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
class GenkitRegistry:
    """User-facing API for interacting with Genkit registry."""

    def __init__(self):
        """Initialize the Genkit registry."""
        self.registry: Registry = Registry()

    def flow(self, name: str | None = None, description: str | None = None) -> Callable[[Callable], Callable]:
        """Decorator to register a function as a flow.

        Args:
            name: Optional name for the flow. If not provided, uses the
                function name.
            description: Optional description for the flow. If not provided,
                uses the function docstring.

        Returns:
            A decorator function that registers the flow.
        """

        def wrapper(func: Callable) -> Callable:
            """Register the decorated function as a flow.

            Args:
                func: The function to register as a flow.

            Returns:
                The wrapped function that executes the flow.
            """
            flow_name = name if name is not None else func.__name__
            flow_description = get_func_description(func, description)
            action = self.registry.register_action(
                name=flow_name,
                kind=ActionKind.FLOW,
                fn=func,
                description=flow_description,
                span_metadata={'genkit:metadata:flow:name': flow_name},
            )

            @wraps(func)
            async def async_wrapper(*args, **kwargs):
                """Asynchronous wrapper for the flow function.

                Args:
                    *args: Positional arguments to pass to the flow function.
                    **kwargs: Keyword arguments to pass to the flow function.

                Returns:
                    The response from the flow function.
                """
                return (await action.arun(*args, **kwargs)).response

            @wraps(func)
            def sync_wrapper(*args, **kwargs):
                """Synchronous wrapper for the flow function.

                Args:
                    *args: Positional arguments to pass to the flow function.
                    **kwargs: Keyword arguments to pass to the flow function.

                Returns:
                    The response from the flow function.
                """
                return action.run(*args, **kwargs).response

            return FlowWrapper(
                fn=async_wrapper if action.is_async else sync_wrapper,
                action=action,
            )

        return wrapper

    def tool(self, name: str | None = None, description: str | None = None) -> Callable[[Callable], Callable]:
        """Decorator to register a function as a tool.

        Args:
            name: Optional name for the flow. If not provided, uses the function
                name.
            description: Description for the tool to be passed to the model;
                if not provided, uses the function docstring.

        Returns:
            A decorator function that registers the tool.
        """

        def wrapper(func: Callable) -> Callable:
            """Register the decorated function as a tool.

            Args:
                func: The function to register as a tool.

            Returns:
                The wrapped function that executes the tool.
            """
            tool_name = name if name is not None else func.__name__
            tool_description = get_func_description(func, description)

            input_spec = inspect.getfullargspec(func)

            def tool_fn_wrapper(*args):
                match len(input_spec.args):
                    case 0:
                        return func()
                    case 1:
                        return func(args[0])
                    case 2:
                        return func(args[0], ToolRunContext(args[1]))
                    case _:
                        raise ValueError('tool must have 0-2 args...')

            action = self.registry.register_action(
                name=tool_name,
                kind=ActionKind.TOOL,
                description=tool_description,
                fn=tool_fn_wrapper,
                metadata_fn=func,
            )

            @wraps(func)
            async def async_wrapper(*args, **kwargs):
                """Asynchronous wrapper for the tool function.

                Args:
                    *args: Positional arguments to pass to the tool function.
                    **kwargs: Keyword arguments to pass to the tool function.

                Returns:
                    The response from the tool function.
                """
                return (await action.arun(*args, **kwargs)).response

            @wraps(func)
            def sync_wrapper(*args, **kwargs):
                """Synchronous wrapper for the tool function.

                Args:
                    *args: Positional arguments to pass to the tool function.
                    **kwargs: Keyword arguments to pass to the tool function.

                Returns:
                    The response from the tool function.
                """
                return action.run(*args, **kwargs).response

            return async_wrapper if action.is_async else sync_wrapper

        return wrapper

    def define_retriever(
        self,
        name: str,
        fn: RetrieverFn,
        config_schema: BaseModel | dict[str, Any] | None = None,
        metadata: dict[str, Any] | None = None,
        description: str | None = None,
    ) -> Callable[[Callable], Callable]:
        """Define a retriever action.

        Args:
            name: Name of the retriever.
            fn: Function implementing the retriever behavior.
            config_schema: Optional schema for retriever configuration.
            metadata: Optional metadata for the retriever.
            description: Optional description for the retriever.
        """
        retriever_meta = metadata if metadata else {}
        if 'retriever' not in retriever_meta:
            retriever_meta['retriever'] = {}
        if 'label' not in retriever_meta['retriever'] or not retriever_meta['retriever']['label']:
            retriever_meta['retriever']['label'] = name
        if config_schema:
            retriever_meta['retriever']['customOptions'] = to_json_schema(config_schema)

        retriever_description = get_func_description(fn, description)
        return self.registry.register_action(
            name=name,
            kind=ActionKind.RETRIEVER,
            fn=fn,
            metadata=retriever_meta,
            description=retriever_description,
        )

    def define_evaluator(
        self,
        name: str,
        display_name: str,
        definition: str,
        fn: EvaluatorFn,
        is_billed: bool = False,
        config_schema: BaseModel | dict[str, Any] | None = None,
        metadata: dict[str, Any] | None = None,
        description: str | None = None,
    ) -> Callable[[Callable], Callable]:
        """Define a evaluator action.

        This action runs the callback function on the every sample of
        the input dataset.

        Args:
            name: Name of the evaluator.
            fn: Function implementing the evaluator behavior.
            display_name: User-visible display name
            definition: User-visible evaluator definition
            is_billed: Whether the evaluator performs any billed actions
                        (paid  APIs, LLMs etc.)
            config_schema: Optional schema for evaluator configuration.
            metadata: Optional metadata for the evaluator.
            description: Optional description for the evaluator.
        """
        evaluator_meta = metadata if metadata else {}
        if 'evaluator' not in evaluator_meta:
            evaluator_meta['evaluator'] = {}
        evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DEFINITION] = definition
        evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DISPLAY_NAME] = display_name
        evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_IS_BILLED] = is_billed
        if 'label' not in evaluator_meta['evaluator'] or not evaluator_meta['evaluator']['label']:
            evaluator_meta['evaluator']['label'] = name
        if config_schema:
            evaluator_meta['evaluator']['customOptions'] = to_json_schema(config_schema)

        evaluator_description = get_func_description(fn, description)

        def eval_stepper_fn(req: EvalRequest) -> EvalResponse:
            eval_responses: list[EvalFnResponse] = []
            for index in range(len(req.dataset)):
                datapoint = req.dataset[index]
                if datapoint.test_case_id is None:
                    datapoint.test_case_id = str(uuid.uuid4())
                span_metadata = SpanMetadata(
                    name=f'Test Case {datapoint.test_case_id}',
                    metadata={'evaluator:evalRunId': req.eval_run_id},
                )
                try:
                    with run_in_new_span(span_metadata, labels={'genkit:type': 'evaluator'}) as span:
                        span_id = span.span_id()
                        trace_id = span.trace_id()
                        try:
                            span.set_input(datapoint)
                            test_case_output = fn(datapoint, req.options)
                            test_case_output.span_id = span_id
                            test_case_output.trace_id = trace_id
                            span.set_output(test_case_output)
                            eval_responses.append(test_case_output)
                        except Exception as e:
                            logger.debug(f'eval_stepper_fn error: {str(e)}')
                            logger.debug(traceback.format_exc())
                            evaluation = Score(
                                error=f'Evaluation of test case {datapoint.test_case_id} failed: \n{str(e)}'
                            )
                            eval_responses.append(
                                EvalFnResponse(
                                    span_id=span_id,
                                    trace_id=trace_id,
                                    test_case_id=datapoint.test_case_id,
                                    evaluation=evaluation,
                                )
                            )
                            # Raise to mark span as failed
                            raise e
                except Exception:
                    # Continue to process other points
                    continue
            return EvalResponse(eval_responses)

        return self.registry.register_action(
            name=name,
            kind=ActionKind.EVALUATOR,
            fn=eval_stepper_fn,
            metadata=evaluator_meta,
            description=evaluator_description,
        )

    def define_batch_evaluator(
        self,
        name: str,
        display_name: str,
        definition: str,
        fn: BatchEvaluatorFn,
        is_billed: bool = False,
        config_schema: BaseModel | dict[str, Any] | None = None,
        metadata: dict[str, Any] | None = None,
        description: str | None = None,
    ) -> Callable[[Callable], Callable]:
        """Define a batch evaluator action.

        This action runs the callback function on the entire dataset.

        Args:
            name: Name of the evaluator.
            fn: Function implementing the evaluator behavior.
            display_name: User-visible display name
            definition: User-visible evaluator definition
            is_billed: Whether the evaluator performs any billed actions
                        (paid  APIs, LLMs etc.)
            config_schema: Optional schema for evaluator configuration.
            metadata: Optional metadata for the evaluator.
            description: Optional description for the evaluator.
        """
        evaluator_meta = metadata if metadata else {}
        if 'evaluator' not in evaluator_meta:
            evaluator_meta['evaluator'] = {}
        evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DEFINITION] = definition
        evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DISPLAY_NAME] = display_name
        evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_IS_BILLED] = is_billed
        if 'label' not in evaluator_meta['evaluator'] or not evaluator_meta['evaluator']['label']:
            evaluator_meta['evaluator']['label'] = name
        if config_schema:
            evaluator_meta['evaluator']['customOptions'] = to_json_schema(config_schema)

        evaluator_description = get_func_description(fn, description)
        return self.registry.register_action(
            name=name,
            kind=ActionKind.EVALUATOR,
            fn=fn,
            metadata=evaluator_meta,
            description=evaluator_description,
        )

    def define_model(
        self,
        name: str,
        fn: ModelFn,
        config_schema: BaseModel | dict[str, Any] | None = None,
        metadata: dict[str, Any] | None = None,
        info: ModelInfo | None = None,
        description: str | None = None,
    ) -> Action:
        """Define a custom model action.

        Args:
            name: Name of the model.
            fn: Function implementing the model behavior.
            config_schema: Optional schema for model configuration.
            metadata: Optional metadata for the model.
            info: Optional ModelInfo for the model.
            description: Optional description for the model.
        """
        model_meta: dict[str, Any] = metadata if metadata else {}
        if info:
            model_meta['model'] = dump_dict(info)
        if 'model' not in model_meta:
            model_meta['model'] = {}
        if 'label' not in model_meta['model'] or not model_meta['model']['label']:
            model_meta['model']['label'] = name

        if config_schema:
            model_meta['model']['customOptions'] = to_json_schema(config_schema)

        model_description = get_func_description(fn, description)
        return self.registry.register_action(
            name=name,
            kind=ActionKind.MODEL,
            fn=fn,
            metadata=model_meta,
            description=model_description,
        )

    def define_embedder(
        self,
        name: str,
        fn: EmbedderFn,
        metadata: dict[str, Any] | None = None,
        description: str | None = None,
    ) -> Action:
        """Define a custom embedder action.

        Args:
            name: Name of the model.
            fn: Function implementing the embedder behavior.
            metadata: Optional metadata for the model.
            description: Optional description for the embedder.
        """
        embedder_description = get_func_description(fn, description)
        return self.registry.register_action(
            name=name,
            kind=ActionKind.EMBEDDER,
            fn=fn,
            metadata=metadata,
            description=embedder_description,
        )

    def define_format(self, format: FormatDef) -> None:
        """Registers a custom format in the registry.

        Args:
            format: The format to register.
        """
        self.registry.register_value('format', format.name, format)

    def define_prompt(
        self,
        variant: str | None = None,
        model: str | None = None,
        config: GenerationCommonConfig | dict[str, Any] | None = None,
        description: str | None = None,
        input_schema: type | dict[str, Any] | None = None,
        system: str | Part | list[Part] | None = None,
        prompt: str | Part | list[Part] | None = None,
        messages: str | list[Message] | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: bool | str | None = None,
        output_schema: type | dict[str, Any] | None = None,
        output_constrained: bool | None = None,
        max_turns: int | None = None,
        return_tool_requests: bool | None = None,
        metadata: dict[str, Any] | None = None,
        tools: list[str] | None = None,
        tool_choice: ToolChoice | None = None,
        use: list[ModelMiddleware] | None = None,
        # TODO:
        #  docs: list[Document]
    ):
        """Define a prompt.

        Args:
            variant: Optional variant name for the prompt.
            model: Optional model name to use for the prompt.
            config: Optional configuration for the model.
            description: Optional description for the prompt.
            input_schema: Optional schema for the input to the prompt.
            system: Optional system message for the prompt.
            prompt: Optional prompt for the model.
            messages: Optional messages for the model.
            output_format: Optional output format for the prompt.
            output_content_type: Optional output content type for the prompt.
            output_instructions: Optional output instructions for the prompt.
            output_schema: Optional schema for the output from the prompt.
            output_constrained: Optional flag indicating whether the output
                should be constrained.
            max_turns: Optional maximum number of turns for the prompt.
            return_tool_requests: Optional flag indicating whether tool requests
                should be returned.
            metadata: Optional metadata for the prompt.
            tools: Optional list of tools to use for the prompt.
            tool_choice: Optional tool choice for the prompt.
            use: Optional list of model middlewares to use for the prompt.
        """
        return define_prompt(
            self.registry,
            variant=variant,
            model=model,
            config=config,
            description=description,
            input_schema=input_schema,
            system=system,
            prompt=prompt,
            messages=messages,
            output_format=output_format,
            output_content_type=output_content_type,
            output_instructions=output_instructions,
            output_schema=output_schema,
            output_constrained=output_constrained,
            max_turns=max_turns,
            return_tool_requests=return_tool_requests,
            metadata=metadata,
            tools=tools,
            tool_choice=tool_choice,
            use=use,
        )

__init__()

Initialize the Genkit registry.

Source code in packages/genkit/src/genkit/ai/_registry.py
def __init__(self):
    """Initialize the Genkit registry."""
    self.registry: Registry = Registry()

define_batch_evaluator(name, display_name, definition, fn, is_billed=False, config_schema=None, metadata=None, description=None)

Define a batch evaluator action.

This action runs the callback function on the entire dataset.

Parameters:

Name Type Description Default
name str

Name of the evaluator.

required
fn genkit.blocks.evaluator.BatchEvaluatorFn

Function implementing the evaluator behavior.

required
display_name str

User-visible display name

required
definition str

User-visible evaluator definition

required
is_billed bool

Whether the evaluator performs any billed actions (paid APIs, LLMs etc.)

False
config_schema pydantic.BaseModel | dict[str, typing.Any] | None

Optional schema for evaluator configuration.

None
metadata dict[str, typing.Any] | None

Optional metadata for the evaluator.

None
description str | None

Optional description for the evaluator.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_batch_evaluator(
    self,
    name: str,
    display_name: str,
    definition: str,
    fn: BatchEvaluatorFn,
    is_billed: bool = False,
    config_schema: BaseModel | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    description: str | None = None,
) -> Callable[[Callable], Callable]:
    """Define a batch evaluator action.

    This action runs the callback function on the entire dataset.

    Args:
        name: Name of the evaluator.
        fn: Function implementing the evaluator behavior.
        display_name: User-visible display name
        definition: User-visible evaluator definition
        is_billed: Whether the evaluator performs any billed actions
                    (paid  APIs, LLMs etc.)
        config_schema: Optional schema for evaluator configuration.
        metadata: Optional metadata for the evaluator.
        description: Optional description for the evaluator.
    """
    evaluator_meta = metadata if metadata else {}
    if 'evaluator' not in evaluator_meta:
        evaluator_meta['evaluator'] = {}
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DEFINITION] = definition
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DISPLAY_NAME] = display_name
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_IS_BILLED] = is_billed
    if 'label' not in evaluator_meta['evaluator'] or not evaluator_meta['evaluator']['label']:
        evaluator_meta['evaluator']['label'] = name
    if config_schema:
        evaluator_meta['evaluator']['customOptions'] = to_json_schema(config_schema)

    evaluator_description = get_func_description(fn, description)
    return self.registry.register_action(
        name=name,
        kind=ActionKind.EVALUATOR,
        fn=fn,
        metadata=evaluator_meta,
        description=evaluator_description,
    )

define_embedder(name, fn, metadata=None, description=None)

Define a custom embedder action.

Parameters:

Name Type Description Default
name str

Name of the model.

required
fn genkit.blocks.embedding.EmbedderFn

Function implementing the embedder behavior.

required
metadata dict[str, typing.Any] | None

Optional metadata for the model.

None
description str | None

Optional description for the embedder.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_embedder(
    self,
    name: str,
    fn: EmbedderFn,
    metadata: dict[str, Any] | None = None,
    description: str | None = None,
) -> Action:
    """Define a custom embedder action.

    Args:
        name: Name of the model.
        fn: Function implementing the embedder behavior.
        metadata: Optional metadata for the model.
        description: Optional description for the embedder.
    """
    embedder_description = get_func_description(fn, description)
    return self.registry.register_action(
        name=name,
        kind=ActionKind.EMBEDDER,
        fn=fn,
        metadata=metadata,
        description=embedder_description,
    )

define_evaluator(name, display_name, definition, fn, is_billed=False, config_schema=None, metadata=None, description=None)

Define a evaluator action.

This action runs the callback function on the every sample of the input dataset.

Parameters:

Name Type Description Default
name str

Name of the evaluator.

required
fn genkit.blocks.evaluator.EvaluatorFn

Function implementing the evaluator behavior.

required
display_name str

User-visible display name

required
definition str

User-visible evaluator definition

required
is_billed bool

Whether the evaluator performs any billed actions (paid APIs, LLMs etc.)

False
config_schema pydantic.BaseModel | dict[str, typing.Any] | None

Optional schema for evaluator configuration.

None
metadata dict[str, typing.Any] | None

Optional metadata for the evaluator.

None
description str | None

Optional description for the evaluator.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_evaluator(
    self,
    name: str,
    display_name: str,
    definition: str,
    fn: EvaluatorFn,
    is_billed: bool = False,
    config_schema: BaseModel | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    description: str | None = None,
) -> Callable[[Callable], Callable]:
    """Define a evaluator action.

    This action runs the callback function on the every sample of
    the input dataset.

    Args:
        name: Name of the evaluator.
        fn: Function implementing the evaluator behavior.
        display_name: User-visible display name
        definition: User-visible evaluator definition
        is_billed: Whether the evaluator performs any billed actions
                    (paid  APIs, LLMs etc.)
        config_schema: Optional schema for evaluator configuration.
        metadata: Optional metadata for the evaluator.
        description: Optional description for the evaluator.
    """
    evaluator_meta = metadata if metadata else {}
    if 'evaluator' not in evaluator_meta:
        evaluator_meta['evaluator'] = {}
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DEFINITION] = definition
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_DISPLAY_NAME] = display_name
    evaluator_meta['evaluator'][EVALUATOR_METADATA_KEY_IS_BILLED] = is_billed
    if 'label' not in evaluator_meta['evaluator'] or not evaluator_meta['evaluator']['label']:
        evaluator_meta['evaluator']['label'] = name
    if config_schema:
        evaluator_meta['evaluator']['customOptions'] = to_json_schema(config_schema)

    evaluator_description = get_func_description(fn, description)

    def eval_stepper_fn(req: EvalRequest) -> EvalResponse:
        eval_responses: list[EvalFnResponse] = []
        for index in range(len(req.dataset)):
            datapoint = req.dataset[index]
            if datapoint.test_case_id is None:
                datapoint.test_case_id = str(uuid.uuid4())
            span_metadata = SpanMetadata(
                name=f'Test Case {datapoint.test_case_id}',
                metadata={'evaluator:evalRunId': req.eval_run_id},
            )
            try:
                with run_in_new_span(span_metadata, labels={'genkit:type': 'evaluator'}) as span:
                    span_id = span.span_id()
                    trace_id = span.trace_id()
                    try:
                        span.set_input(datapoint)
                        test_case_output = fn(datapoint, req.options)
                        test_case_output.span_id = span_id
                        test_case_output.trace_id = trace_id
                        span.set_output(test_case_output)
                        eval_responses.append(test_case_output)
                    except Exception as e:
                        logger.debug(f'eval_stepper_fn error: {str(e)}')
                        logger.debug(traceback.format_exc())
                        evaluation = Score(
                            error=f'Evaluation of test case {datapoint.test_case_id} failed: \n{str(e)}'
                        )
                        eval_responses.append(
                            EvalFnResponse(
                                span_id=span_id,
                                trace_id=trace_id,
                                test_case_id=datapoint.test_case_id,
                                evaluation=evaluation,
                            )
                        )
                        # Raise to mark span as failed
                        raise e
            except Exception:
                # Continue to process other points
                continue
        return EvalResponse(eval_responses)

    return self.registry.register_action(
        name=name,
        kind=ActionKind.EVALUATOR,
        fn=eval_stepper_fn,
        metadata=evaluator_meta,
        description=evaluator_description,
    )

define_format(format)

Registers a custom format in the registry.

Parameters:

Name Type Description Default
format genkit.blocks.formats.types.FormatDef

The format to register.

required
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_format(self, format: FormatDef) -> None:
    """Registers a custom format in the registry.

    Args:
        format: The format to register.
    """
    self.registry.register_value('format', format.name, format)

define_model(name, fn, config_schema=None, metadata=None, info=None, description=None)

Define a custom model action.

Parameters:

Name Type Description Default
name str

Name of the model.

required
fn genkit.blocks.model.ModelFn

Function implementing the model behavior.

required
config_schema pydantic.BaseModel | dict[str, typing.Any] | None

Optional schema for model configuration.

None
metadata dict[str, typing.Any] | None

Optional metadata for the model.

None
info genkit.core.typing.ModelInfo | None

Optional ModelInfo for the model.

None
description str | None

Optional description for the model.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_model(
    self,
    name: str,
    fn: ModelFn,
    config_schema: BaseModel | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    info: ModelInfo | None = None,
    description: str | None = None,
) -> Action:
    """Define a custom model action.

    Args:
        name: Name of the model.
        fn: Function implementing the model behavior.
        config_schema: Optional schema for model configuration.
        metadata: Optional metadata for the model.
        info: Optional ModelInfo for the model.
        description: Optional description for the model.
    """
    model_meta: dict[str, Any] = metadata if metadata else {}
    if info:
        model_meta['model'] = dump_dict(info)
    if 'model' not in model_meta:
        model_meta['model'] = {}
    if 'label' not in model_meta['model'] or not model_meta['model']['label']:
        model_meta['model']['label'] = name

    if config_schema:
        model_meta['model']['customOptions'] = to_json_schema(config_schema)

    model_description = get_func_description(fn, description)
    return self.registry.register_action(
        name=name,
        kind=ActionKind.MODEL,
        fn=fn,
        metadata=model_meta,
        description=model_description,
    )

define_prompt(variant=None, model=None, config=None, description=None, input_schema=None, system=None, prompt=None, messages=None, output_format=None, output_content_type=None, output_instructions=None, output_schema=None, output_constrained=None, max_turns=None, return_tool_requests=None, metadata=None, tools=None, tool_choice=None, use=None)

Define a prompt.

Parameters:

Name Type Description Default
variant str | None

Optional variant name for the prompt.

None
model str | None

Optional model name to use for the prompt.

None
config genkit.core.typing.GenerationCommonConfig | dict[str, typing.Any] | None

Optional configuration for the model.

None
description str | None

Optional description for the prompt.

None
input_schema type | dict[str, typing.Any] | None

Optional schema for the input to the prompt.

None
system str | genkit.core.typing.Part | list[genkit.core.typing.Part] | None

Optional system message for the prompt.

None
prompt str | genkit.core.typing.Part | list[genkit.core.typing.Part] | None

Optional prompt for the model.

None
messages str | list[genkit.core.typing.Message] | None

Optional messages for the model.

None
output_format str | None

Optional output format for the prompt.

None
output_content_type str | None

Optional output content type for the prompt.

None
output_instructions bool | str | None

Optional output instructions for the prompt.

None
output_schema type | dict[str, typing.Any] | None

Optional schema for the output from the prompt.

None
output_constrained bool | None

Optional flag indicating whether the output should be constrained.

None
max_turns int | None

Optional maximum number of turns for the prompt.

None
return_tool_requests bool | None

Optional flag indicating whether tool requests should be returned.

None
metadata dict[str, typing.Any] | None

Optional metadata for the prompt.

None
tools list[str] | None

Optional list of tools to use for the prompt.

None
tool_choice genkit.core.typing.ToolChoice | None

Optional tool choice for the prompt.

None
use list[genkit.blocks.model.ModelMiddleware] | None

Optional list of model middlewares to use for the prompt.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_prompt(
    self,
    variant: str | None = None,
    model: str | None = None,
    config: GenerationCommonConfig | dict[str, Any] | None = None,
    description: str | None = None,
    input_schema: type | dict[str, Any] | None = None,
    system: str | Part | list[Part] | None = None,
    prompt: str | Part | list[Part] | None = None,
    messages: str | list[Message] | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: bool | str | None = None,
    output_schema: type | dict[str, Any] | None = None,
    output_constrained: bool | None = None,
    max_turns: int | None = None,
    return_tool_requests: bool | None = None,
    metadata: dict[str, Any] | None = None,
    tools: list[str] | None = None,
    tool_choice: ToolChoice | None = None,
    use: list[ModelMiddleware] | None = None,
    # TODO:
    #  docs: list[Document]
):
    """Define a prompt.

    Args:
        variant: Optional variant name for the prompt.
        model: Optional model name to use for the prompt.
        config: Optional configuration for the model.
        description: Optional description for the prompt.
        input_schema: Optional schema for the input to the prompt.
        system: Optional system message for the prompt.
        prompt: Optional prompt for the model.
        messages: Optional messages for the model.
        output_format: Optional output format for the prompt.
        output_content_type: Optional output content type for the prompt.
        output_instructions: Optional output instructions for the prompt.
        output_schema: Optional schema for the output from the prompt.
        output_constrained: Optional flag indicating whether the output
            should be constrained.
        max_turns: Optional maximum number of turns for the prompt.
        return_tool_requests: Optional flag indicating whether tool requests
            should be returned.
        metadata: Optional metadata for the prompt.
        tools: Optional list of tools to use for the prompt.
        tool_choice: Optional tool choice for the prompt.
        use: Optional list of model middlewares to use for the prompt.
    """
    return define_prompt(
        self.registry,
        variant=variant,
        model=model,
        config=config,
        description=description,
        input_schema=input_schema,
        system=system,
        prompt=prompt,
        messages=messages,
        output_format=output_format,
        output_content_type=output_content_type,
        output_instructions=output_instructions,
        output_schema=output_schema,
        output_constrained=output_constrained,
        max_turns=max_turns,
        return_tool_requests=return_tool_requests,
        metadata=metadata,
        tools=tools,
        tool_choice=tool_choice,
        use=use,
    )

define_retriever(name, fn, config_schema=None, metadata=None, description=None)

Define a retriever action.

Parameters:

Name Type Description Default
name str

Name of the retriever.

required
fn genkit.blocks.retriever.RetrieverFn

Function implementing the retriever behavior.

required
config_schema pydantic.BaseModel | dict[str, typing.Any] | None

Optional schema for retriever configuration.

None
metadata dict[str, typing.Any] | None

Optional metadata for the retriever.

None
description str | None

Optional description for the retriever.

None
Source code in packages/genkit/src/genkit/ai/_registry.py
def define_retriever(
    self,
    name: str,
    fn: RetrieverFn,
    config_schema: BaseModel | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    description: str | None = None,
) -> Callable[[Callable], Callable]:
    """Define a retriever action.

    Args:
        name: Name of the retriever.
        fn: Function implementing the retriever behavior.
        config_schema: Optional schema for retriever configuration.
        metadata: Optional metadata for the retriever.
        description: Optional description for the retriever.
    """
    retriever_meta = metadata if metadata else {}
    if 'retriever' not in retriever_meta:
        retriever_meta['retriever'] = {}
    if 'label' not in retriever_meta['retriever'] or not retriever_meta['retriever']['label']:
        retriever_meta['retriever']['label'] = name
    if config_schema:
        retriever_meta['retriever']['customOptions'] = to_json_schema(config_schema)

    retriever_description = get_func_description(fn, description)
    return self.registry.register_action(
        name=name,
        kind=ActionKind.RETRIEVER,
        fn=fn,
        metadata=retriever_meta,
        description=retriever_description,
    )

flow(name=None, description=None)

Decorator to register a function as a flow.

Parameters:

Name Type Description Default
name str | None

Optional name for the flow. If not provided, uses the function name.

None
description str | None

Optional description for the flow. If not provided, uses the function docstring.

None

Returns:

Type Description
collections.abc.Callable[[collections.abc.Callable], collections.abc.Callable]

A decorator function that registers the flow.

Source code in packages/genkit/src/genkit/ai/_registry.py
def flow(self, name: str | None = None, description: str | None = None) -> Callable[[Callable], Callable]:
    """Decorator to register a function as a flow.

    Args:
        name: Optional name for the flow. If not provided, uses the
            function name.
        description: Optional description for the flow. If not provided,
            uses the function docstring.

    Returns:
        A decorator function that registers the flow.
    """

    def wrapper(func: Callable) -> Callable:
        """Register the decorated function as a flow.

        Args:
            func: The function to register as a flow.

        Returns:
            The wrapped function that executes the flow.
        """
        flow_name = name if name is not None else func.__name__
        flow_description = get_func_description(func, description)
        action = self.registry.register_action(
            name=flow_name,
            kind=ActionKind.FLOW,
            fn=func,
            description=flow_description,
            span_metadata={'genkit:metadata:flow:name': flow_name},
        )

        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            """Asynchronous wrapper for the flow function.

            Args:
                *args: Positional arguments to pass to the flow function.
                **kwargs: Keyword arguments to pass to the flow function.

            Returns:
                The response from the flow function.
            """
            return (await action.arun(*args, **kwargs)).response

        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            """Synchronous wrapper for the flow function.

            Args:
                *args: Positional arguments to pass to the flow function.
                **kwargs: Keyword arguments to pass to the flow function.

            Returns:
                The response from the flow function.
            """
            return action.run(*args, **kwargs).response

        return FlowWrapper(
            fn=async_wrapper if action.is_async else sync_wrapper,
            action=action,
        )

    return wrapper

tool(name=None, description=None)

Decorator to register a function as a tool.

Parameters:

Name Type Description Default
name str | None

Optional name for the flow. If not provided, uses the function name.

None
description str | None

Description for the tool to be passed to the model; if not provided, uses the function docstring.

None

Returns:

Type Description
collections.abc.Callable[[collections.abc.Callable], collections.abc.Callable]

A decorator function that registers the tool.

Source code in packages/genkit/src/genkit/ai/_registry.py
def tool(self, name: str | None = None, description: str | None = None) -> Callable[[Callable], Callable]:
    """Decorator to register a function as a tool.

    Args:
        name: Optional name for the flow. If not provided, uses the function
            name.
        description: Description for the tool to be passed to the model;
            if not provided, uses the function docstring.

    Returns:
        A decorator function that registers the tool.
    """

    def wrapper(func: Callable) -> Callable:
        """Register the decorated function as a tool.

        Args:
            func: The function to register as a tool.

        Returns:
            The wrapped function that executes the tool.
        """
        tool_name = name if name is not None else func.__name__
        tool_description = get_func_description(func, description)

        input_spec = inspect.getfullargspec(func)

        def tool_fn_wrapper(*args):
            match len(input_spec.args):
                case 0:
                    return func()
                case 1:
                    return func(args[0])
                case 2:
                    return func(args[0], ToolRunContext(args[1]))
                case _:
                    raise ValueError('tool must have 0-2 args...')

        action = self.registry.register_action(
            name=tool_name,
            kind=ActionKind.TOOL,
            description=tool_description,
            fn=tool_fn_wrapper,
            metadata_fn=func,
        )

        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            """Asynchronous wrapper for the tool function.

            Args:
                *args: Positional arguments to pass to the tool function.
                **kwargs: Keyword arguments to pass to the tool function.

            Returns:
                The response from the tool function.
            """
            return (await action.arun(*args, **kwargs)).response

        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            """Synchronous wrapper for the tool function.

            Args:
                *args: Positional arguments to pass to the tool function.
                **kwargs: Keyword arguments to pass to the tool function.

            Returns:
                The response from the tool function.
            """
            return action.run(*args, **kwargs).response

        return async_wrapper if action.is_async else sync_wrapper

    return wrapper

Bases: abc.ABC

Abstract base class for implementing Genkit plugins.

This class defines the interface that all plugins must implement. Plugins provide a way to extend functionality by registering new actions, models, or other capabilities.

Source code in packages/genkit/src/genkit/ai/_plugin.py
class Plugin(abc.ABC):
    """Abstract base class for implementing Genkit plugins.

    This class defines the interface that all plugins must implement.  Plugins
    provide a way to extend functionality by registering new actions, models, or
    other capabilities.
    """

    def plugin_name(self):
        """The name of the plugin.

        Returns:
            The name of the plugin.
        """
        return self.name

    # TODO: https://github.com/firebase/genkit/issues/2438
    # @abc.abstractmethod
    def resolve_action(self, ai: GenkitRegistry, kind: ActionKind, name: str) -> None:
        """Resolves an action by adding it to the provided GenkitRegistry.

        Args:
            ai: The Genkit registry.
            kind: The kind of action to resolve.
            name: The name of the action to resolve.

        Returns:
            None, action resolution is done by side-effect on the registry.
        """
        pass

    @abc.abstractmethod
    def initialize(self, ai: GenkitRegistry) -> None:
        """Initialize the plugin with the given registry.

        Args:
            ai: Registry to register plugin functionality.

        Returns:
            None, initialization is done by side-effect on the registry.
        """
        pass

initialize(ai) abstractmethod

Initialize the plugin with the given registry.

Parameters:

Name Type Description Default
ai genkit.ai._registry.GenkitRegistry

Registry to register plugin functionality.

required

Returns:

Type Description
None

None, initialization is done by side-effect on the registry.

Source code in packages/genkit/src/genkit/ai/_plugin.py
@abc.abstractmethod
def initialize(self, ai: GenkitRegistry) -> None:
    """Initialize the plugin with the given registry.

    Args:
        ai: Registry to register plugin functionality.

    Returns:
        None, initialization is done by side-effect on the registry.
    """
    pass

plugin_name()

The name of the plugin.

Returns:

Type Description

The name of the plugin.

Source code in packages/genkit/src/genkit/ai/_plugin.py
def plugin_name(self):
    """The name of the plugin.

    Returns:
        The name of the plugin.
    """
    return self.name

resolve_action(ai, kind, name)

Resolves an action by adding it to the provided GenkitRegistry.

Parameters:

Name Type Description Default
ai genkit.ai._registry.GenkitRegistry

The Genkit registry.

required
kind genkit.core.registry.ActionKind

The kind of action to resolve.

required
name str

The name of the action to resolve.

required

Returns:

Type Description
None

None, action resolution is done by side-effect on the registry.

Source code in packages/genkit/src/genkit/ai/_plugin.py
def resolve_action(self, ai: GenkitRegistry, kind: ActionKind, name: str) -> None:
    """Resolves an action by adding it to the provided GenkitRegistry.

    Args:
        ai: The Genkit registry.
        kind: The kind of action to resolve.
        name: The name of the action to resolve.

    Returns:
        None, action resolution is done by side-effect on the registry.
    """
    pass