API Reference

!!! note Full Genkit documentation is available at genkit.dev

genkit

genkit.Genkit

Genkit asyncio user-facing API.

Source code in packages/genkit/src/genkit/_ai/_aio.py
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
class Genkit:
    """Genkit asyncio user-facing API."""

    def __init__(
        self,
        plugins: list[Plugin] | None = None,
        model: str | None = None,
        prompt_dir: str | Path | None = None,
        reflection_server_spec: ServerSpec | None = None,
    ) -> None:
        self.registry: Registry = Registry()
        self._reflection_server_spec: ServerSpec | None = reflection_server_spec
        self._reflection_ready = threading.Event()
        self._initialize_registry(model, plugins)
        # Ensure the default generate action is registered for async usage.
        define_generate_action(self.registry)
        # In dev mode, start the reflection server immediately in a background
        # daemon thread so it's available regardless of which web framework (or
        # none) the user chooses.
        if is_dev_environment():
            self._start_reflection_background()

        # Load prompts
        load_path = prompt_dir
        if load_path is None:
            default_prompts_path = Path('./prompts')
            if default_prompts_path.is_dir():
                load_path = default_prompts_path

        if load_path:
            load_prompt_folder(self.registry, dir_path=load_path)

    # -------------------------------------------------------------------------
    # Registry methods
    # -------------------------------------------------------------------------

    @overload
    def flow(
        self,
        name: str | None = None,
        *,
        description: str | None = None,
        chunk_type: None = None,
    ) -> _FlowDecorator: ...

    @overload
    def flow(
        self,
        name: str | None = None,
        *,
        description: str | None = None,
        chunk_type: type[ChunkT],
    ) -> _FlowDecoratorWithChunk[ChunkT]: ...

    def flow(
        self,
        name: str | None = None,
        *,
        description: str | None = None,
        chunk_type: type[Any] | None = None,
    ) -> _FlowDecorator | _FlowDecoratorWithChunk[Any]:
        """Decorator to register an async function as a flow.

        Args:
            name: Optional name for the flow. Defaults to the function name.
            description: Optional description for the flow.
            chunk_type: Optional type for streaming chunks. When provided,
                the returned Action will be typed as Action[InputT, OutputT, ChunkT].

        Example:
            @ai.flow()
            async def my_flow(x: str) -> int: ...  # Action[str, int]

            @ai.flow(chunk_type=str)
            async def streaming_flow(x: int, ctx: ActionRunContext) -> str:
                ctx.send_chunk("progress")
                return "done"
            # Action[int, str, str]
        """
        if chunk_type is not None:
            return _FlowDecoratorWithChunk(self.registry, name, description, chunk_type)
        return _FlowDecorator(self.registry, name, description)

    def define_helper(self, name: str, fn: Callable[..., Any]) -> None:
        """Register a Handlebars helper function."""
        define_helper(self.registry, name, fn)

    def define_partial(self, name: str, source: str) -> None:
        """Register a Handlebars partial template."""
        define_partial(self.registry, name, source)

    def define_schema(self, name: str, schema: type[BaseModel]) -> type[BaseModel]:
        """Register a Pydantic schema for use in prompts."""
        define_schema(self.registry, name, schema)
        return schema

    def define_json_schema(self, name: str, json_schema: dict[str, object]) -> dict[str, object]:
        """Register a JSON schema for use in prompts."""
        self.registry.register_schema(name, json_schema)
        return json_schema

    def define_dynamic_action_provider(
        self,
        name: str,
        fn: DapFn,
        *,
        description: str | None = None,
        cache_ttl_millis: int | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> DynamicActionProvider:
        """Register a Dynamic Action Provider (DAP)."""
        return define_dap_block(
            self.registry,
            name,
            fn,
            description=description,
            cache_ttl_millis=cache_ttl_millis,
            metadata=metadata,
        )

    def tool(self, name: str | None = None, description: str | None = None) -> Callable[[Callable[..., Any]], Tool]:
        """Decorator to register a function as a tool."""

        def wrapper(func: Callable[..., Any]) -> Tool:
            return define_tool(self.registry, func, name, description)

        return wrapper

    def define_interrupt(
        self,
        name: str,
        *,
        input_schema: type[BaseModel] | dict[str, object] | None = None,
        description: str | None = None,
    ) -> Tool:
        """Register an interrupt tool that always pauses for user input.

        Args:
            name: Tool name
            input_schema: Optional input schema (Pydantic model or JSON schema dict)
            description: Tool description

        Returns:
            The registered interrupt tool

        Example:
            ask_user = ai.define_interrupt(
                name='ask_user',
                input_schema=Question,
                description='Ask the user a question',
            )
        """
        return define_interrupt(
            self.registry,
            name,
            description=description,
            input_schema=input_schema,
        )

    def define_evaluator(
        self,
        *,
        name: str,
        display_name: str,
        definition: str,
        fn: EvaluatorFn[Any],
        is_billed: bool = False,
        config_schema: type[BaseModel] | dict[str, object] | None = None,
        metadata: dict[str, object] | None = None,
        description: str | None = None,
    ) -> Action:
        """Register an evaluator action."""
        return define_evaluator(
            self.registry,
            name=name,
            display_name=display_name,
            definition=definition,
            fn=fn,
            is_billed=is_billed,
            config_schema=config_schema,
            metadata=metadata,
            description=description,
        )

    def define_batch_evaluator(
        self,
        *,
        name: str,
        display_name: str,
        definition: str,
        fn: BatchEvaluatorFn[Any],
        is_billed: bool = False,
        config_schema: type[BaseModel] | dict[str, object] | None = None,
        metadata: dict[str, object] | None = None,
        description: str | None = None,
    ) -> Action:
        """Register a batch evaluator action."""
        return define_batch_evaluator(
            self.registry,
            name=name,
            display_name=display_name,
            definition=definition,
            fn=fn,
            is_billed=is_billed,
            config_schema=config_schema,
            metadata=metadata,
            description=description,
        )

    def define_model(
        self,
        name: str,
        fn: ModelFn,
        config_schema: type[BaseModel] | dict[str, object] | None = None,
        metadata: dict[str, object] | None = None,
        info: ModelInfo | None = None,
        description: str | None = None,
    ) -> Action:
        """Register a custom model action."""
        return define_model(self.registry, name, fn, config_schema, metadata, info, description)

    def define_background_model(
        self,
        name: str,
        start: StartModelOpFn,
        check: CheckModelOpFn,
        cancel: CancelModelOpFn | None = None,
        label: str | None = None,
        info: ModelInfo | None = None,
        config_schema: type[BaseModel] | dict[str, object] | None = None,
        metadata: dict[str, object] | None = None,
        description: str | None = None,
    ) -> BackgroundAction:
        """Register a background model for long-running AI operations."""
        return define_background_model(
            registry=self.registry,
            name=name,
            start=start,
            check=check,
            cancel=cancel,
            label=label,
            info=info,
            config_schema=config_schema,
            metadata=metadata,
            description=description,
        )

    def define_embedder(
        self,
        name: str,
        fn: EmbedderFn,
        options: EmbedderOptions | None = None,
        metadata: dict[str, object] | None = None,
        description: str | None = None,
    ) -> Action:
        """Register a custom embedder action."""
        return define_embedder(self.registry, name, fn, options, metadata, description)

    def define_format(self, format: FormatDef) -> None:
        """Register a custom output format."""
        self.registry.register_value('format', format.name, format)

    # Overload 1: Both input_schema and output_schema typed -> ExecutablePrompt[InputT, OutputT]
    @overload
    def define_prompt(
        self,
        name: str | None = None,
        *,
        variant: str | None = None,
        model: str | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        description: str | None = None,
        system: str | list[Part] | None = None,
        prompt: str | list[Part] | None = None,
        messages: str | list[Message] | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        max_turns: int | None = None,
        return_tool_requests: bool | None = None,
        metadata: dict[str, object] | None = None,
        tools: Sequence[str | Tool] | None = None,
        tool_choice: ToolChoice | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
        input_schema: type[InputT],
        output_schema: type[OutputT],
    ) -> ExecutablePrompt[InputT, OutputT]: ...

    # Overload 2: Only input_schema typed -> ExecutablePrompt[InputT, Any]
    @overload
    def define_prompt(
        self,
        name: str | None = None,
        *,
        variant: str | None = None,
        model: str | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        description: str | None = None,
        system: str | list[Part] | None = None,
        prompt: str | list[Part] | None = None,
        messages: str | list[Message] | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        max_turns: int | None = None,
        return_tool_requests: bool | None = None,
        metadata: dict[str, object] | None = None,
        tools: Sequence[str | Tool] | None = None,
        tool_choice: ToolChoice | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
        input_schema: type[InputT],
        output_schema: dict[str, object] | str | None = None,
    ) -> ExecutablePrompt[InputT, Any]: ...

    # Overload 3: Only output_schema typed -> ExecutablePrompt[Any, OutputT]
    @overload
    def define_prompt(
        self,
        name: str | None = None,
        *,
        variant: str | None = None,
        model: str | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        description: str | None = None,
        system: str | list[Part] | None = None,
        prompt: str | list[Part] | None = None,
        messages: str | list[Message] | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        max_turns: int | None = None,
        return_tool_requests: bool | None = None,
        metadata: dict[str, object] | None = None,
        tools: Sequence[str | Tool] | None = None,
        tool_choice: ToolChoice | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
        input_schema: dict[str, object] | str | None = None,
        output_schema: type[OutputT],
    ) -> ExecutablePrompt[Any, OutputT]: ...

    # Overload 4: Neither typed -> ExecutablePrompt[Any, Any]
    @overload
    def define_prompt(
        self,
        name: str | None = None,
        *,
        variant: str | None = None,
        model: str | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        description: str | None = None,
        system: str | list[Part] | None = None,
        prompt: str | list[Part] | None = None,
        messages: str | list[Message] | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        max_turns: int | None = None,
        return_tool_requests: bool | None = None,
        metadata: dict[str, object] | None = None,
        tools: Sequence[str | Tool] | None = None,
        tool_choice: ToolChoice | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
        input_schema: dict[str, object] | str | None = None,
        output_schema: dict[str, object] | str | None = None,
    ) -> ExecutablePrompt[Any, Any]: ...

    def define_prompt(
        self,
        name: str | None = None,
        *,
        variant: str | None = None,
        model: str | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        description: str | None = None,
        system: str | list[Part] | None = None,
        prompt: str | list[Part] | None = None,
        messages: str | list[Message] | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        max_turns: int | None = None,
        return_tool_requests: bool | None = None,
        metadata: dict[str, object] | None = None,
        tools: Sequence[str | Tool] | None = None,
        tool_choice: ToolChoice | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
        input_schema: type | dict[str, object] | str | None = None,
        output_schema: type | dict[str, object] | str | None = None,
    ) -> ExecutablePrompt[Any, Any]:
        """Register a prompt template."""
        executable_prompt = ExecutablePrompt(
            self.registry,
            variant=variant,
            model=model,
            config=config,
            description=description,
            input_schema=input_schema,
            system=system,
            prompt=prompt,
            messages=messages,
            output_format=output_format,
            output_content_type=output_content_type,
            output_instructions=output_instructions,
            output_schema=output_schema,
            output_constrained=output_constrained,
            max_turns=max_turns,
            return_tool_requests=return_tool_requests,
            metadata=metadata,
            tools=tools,
            tool_choice=tool_choice,
            use=use,
            docs=docs,
            name=name,
        )
        if name:
            register_prompt_actions(self.registry, executable_prompt, name, variant)
        return executable_prompt

    # Overload 1: Neither typed -> ExecutablePrompt[Any, Any]
    @overload
    def prompt(
        self,
        name: str,
        *,
        variant: str | None = None,
        input_schema: None = None,
        output_schema: None = None,
    ) -> ExecutablePrompt[Any, Any]: ...

    # Overload 2: Only input_schema typed
    @overload
    def prompt(
        self,
        name: str,
        *,
        variant: str | None = None,
        input_schema: type[InputT],
        output_schema: None = None,
    ) -> ExecutablePrompt[InputT, Any]: ...

    # Overload 3: Only output_schema typed
    @overload
    def prompt(
        self,
        name: str,
        *,
        variant: str | None = None,
        input_schema: None = None,
        output_schema: type[OutputT],
    ) -> ExecutablePrompt[Any, OutputT]: ...

    # Overload 4: Both input_schema and output_schema typed
    @overload
    def prompt(
        self,
        name: str,
        *,
        variant: str | None = None,
        input_schema: type[InputT],
        output_schema: type[OutputT],
    ) -> ExecutablePrompt[InputT, OutputT]: ...

    def prompt(
        self,
        name: str,
        *,
        variant: str | None = None,
        input_schema: type[InputT] | None = None,
        output_schema: type[OutputT] | None = None,
    ) -> ExecutablePrompt[InputT, OutputT] | ExecutablePrompt[Any, Any]:
        """Look up a prompt by name and optional variant."""
        return ExecutablePrompt(
            registry=self.registry,
            name=name,
            variant=variant,
            input_schema=input_schema,
            output_schema=output_schema,
        )

    def define_resource(
        self,
        *,
        fn: ResourceFn,
        name: str | None = None,
        uri: str | None = None,
        template: str | None = None,
        description: str | None = None,
        metadata: dict[str, object] | None = None,
    ) -> Action:
        """Register a resource action."""
        opts: ResourceOptions = {}
        if name:
            opts['name'] = name
        if uri:
            opts['uri'] = uri
        if template:
            opts['template'] = template
        if description:
            opts['description'] = description
        if metadata:
            opts['metadata'] = metadata

        return define_resource(self.registry, opts, fn)

    # -------------------------------------------------------------------------
    # Server infrastructure methods
    # -------------------------------------------------------------------------

    def _start_reflection_background(self) -> None:
        """Start the Dev UI reflection server in a background daemon thread.

        If GENKIT_REFLECTION_V2_SERVER is set (the CLI launches the runtime in
        v2 mode and provides a WebSocket URL), run the v2 JSON-RPC client.
        Otherwise start the v1 HTTP server.
        """

        async def _run_server() -> None:
            v2_url = os.environ.get('GENKIT_REFLECTION_V2_SERVER')
            if v2_url:
                await logger.ainfo(f'Genkit Dev UI reflection v2 client connecting to {v2_url}')
                server_v2 = ReflectionServerV2(self.registry, v2_url)
                self._reflection_ready.set()
                await server_v2.run_forever()
                return

            sockets: list[socket.socket] | None = None
            spec = self._reflection_server_spec
            if spec is None:
                # Bind to port 0 to let OS choose available port, pass socket to uvicorn
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.bind(('127.0.0.1', 0))
                sock.listen(2048)
                host, port = sock.getsockname()
                spec = ServerSpec(scheme='http', host=host, port=port)
                self._reflection_server_spec = spec
                sockets = [sock]

            app = create_reflection_asgi_app(registry=self.registry)
            config = uvicorn.Config(app, host=spec.host, port=spec.port, loop='asyncio')
            server = ReflectionServer(config, ready=self._reflection_ready)
            async with RuntimeManager(spec, lazy_write=True) as runtime_manager:
                server_task = asyncio.create_task(server.serve(sockets=sockets))
                await asyncio.to_thread(self._reflection_ready.wait)

                if server.should_exit:
                    logger.warning(f'Reflection server at {spec.url} failed to start.')
                    return

                runtime_manager.write_runtime_file()
                await logger.ainfo(f'Genkit Dev UI reflection server running at {spec.url}')
                await server_task

        threading.Thread(
            target=lambda: asyncio.run(_run_server()),
            daemon=True,
            name='genkit-reflection-server',
        ).start()

    def _initialize_registry(self, model: str | None, plugins: list[Plugin] | None) -> None:
        """Initialize the registry with default model and plugins."""
        if model:
            self.registry.register_value('defaultModel', 'defaultModel', model)
        for fmt in built_in_formats:
            self.define_format(fmt)

        if not plugins:
            logger.warning('No plugins provided to Genkit')
        else:
            for plugin in plugins:
                if isinstance(plugin, Plugin):  # pyright: ignore[reportUnnecessaryIsInstance]
                    self.registry.register_plugin(plugin)
                else:
                    raise ValueError(f'Invalid {plugin=} provided to Genkit: must be of type `genkit.ai.Plugin`')

    def run_main(self, coro: Coroutine[Any, Any, T]) -> T | None:
        """Run the user's main coroutine, blocking in dev mode for the reflection server."""
        if not is_dev_environment():
            logger.info('Running in production mode.')
            return run_loop(coro)

        logger.info('Running in development mode.')

        async def dev_runner() -> T | None:
            user_result: T | None = None
            try:
                user_result = await coro
                logger.debug('User coroutine completed successfully.')
            except Exception:
                logger.exception('User coroutine failed')

            # Block until Ctrl+C (SIGINT handled by anyio) or SIGTERM, keeping
            # the daemon reflection thread alive.
            logger.info('Script done — Dev UI running. Press Ctrl+C to stop.')
            try:
                async with anyio.create_task_group() as tg:

                    async def _handle_sigterm(tg_: anyio.abc.TaskGroup) -> None:  # type: ignore[name-defined]
                        with anyio.open_signal_receiver(signal.SIGTERM) as sigs:
                            async for _ in sigs:
                                tg_.cancel_scope.cancel()
                                return

                    tg.start_soon(_handle_sigterm, tg)
                    await anyio.sleep_forever()
            except anyio.get_cancelled_exc_class():
                pass

            logger.info('Dev UI server stopped.')
            return user_result

        return anyio.run(dev_runner)

    # -------------------------------------------------------------------------
    # Genkit-specific methods (generation, embedding, retrieval, etc.)
    # -------------------------------------------------------------------------

    def _resolve_embedder_name(self, embedder: str | EmbedderRef | None) -> str:
        """Resolve embedder name from string or EmbedderRef."""
        if isinstance(embedder, EmbedderRef):
            return embedder.name
        elif isinstance(embedder, str):
            return embedder
        else:
            raise ValueError('Embedder must be specified as a string name or an EmbedderRef.')

    # Overload: output_schema=type[T] -> ModelResponse[T]
    @overload
    async def generate(
        self,
        *,
        model: str | None = None,
        prompt: str | list[Part] | None = None,
        system: str | list[Part] | None = None,
        messages: list[Message] | None = None,
        tools: Sequence[str | Tool] | None = None,
        return_tool_requests: bool | None = None,
        tool_choice: ToolChoice | None = None,
        resume_respond: ToolResponsePart | list[ToolResponsePart] | None = None,
        resume_restart: ToolRequestPart | list[ToolRequestPart] | None = None,
        resume_metadata: dict[str, Any] | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        max_turns: int | None = None,
        context: dict[str, object] | None = None,
        output_schema: type[OutputT],
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
    ) -> ModelResponse[OutputT]: ...

    # Overload: no output_schema, dict, or union -> ModelResponse[Any]
    @overload
    async def generate(
        self,
        *,
        model: str | None = None,
        prompt: str | list[Part] | None = None,
        system: str | list[Part] | None = None,
        messages: list[Message] | None = None,
        tools: Sequence[str | Tool] | None = None,
        return_tool_requests: bool | None = None,
        tool_choice: ToolChoice | None = None,
        resume_respond: ToolResponsePart | list[ToolResponsePart] | None = None,
        resume_restart: ToolRequestPart | list[ToolRequestPart] | None = None,
        resume_metadata: dict[str, Any] | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        max_turns: int | None = None,
        context: dict[str, object] | None = None,
        output_schema: type | dict | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
    ) -> ModelResponse[Any]: ...

    async def generate(
        self,
        *,
        model: str | None = None,
        prompt: str | list[Part] | None = None,
        system: str | list[Part] | None = None,
        messages: list[Message] | None = None,
        tools: Sequence[str | Tool] | None = None,
        return_tool_requests: bool | None = None,
        tool_choice: ToolChoice | None = None,
        resume_respond: ToolResponsePart | list[ToolResponsePart] | None = None,
        resume_restart: ToolRequestPart | list[ToolRequestPart] | None = None,
        resume_metadata: dict[str, Any] | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        max_turns: int | None = None,
        context: dict[str, object] | None = None,
        output_schema: type | dict | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
    ) -> ModelResponse[Any]:
        """Generate text or structured data using a language model.

        ``tools`` is typed as ``Sequence`` rather than ``list`` because ``Sequence``
        is covariant: ``list[Tool]`` or ``list[str]`` are both assignable to
        ``Sequence[str | Tool]``, but not to ``list[str | Tool]``.
        """
        prompt_config = PromptConfig(
            model=model,
            prompt=prompt,
            system=system,
            messages=messages,
            tools=tools,
            return_tool_requests=return_tool_requests,
            tool_choice=tool_choice,
            resume_respond=resume_respond,
            resume_restart=resume_restart,
            resume_metadata=resume_metadata,
            config=config,
            max_turns=max_turns,
            output_format=output_format,
            output_content_type=output_content_type,
            output_instructions=output_instructions,
            output_schema=output_schema,
            output_constrained=output_constrained,
            docs=docs,
        )
        registry = await registry_with_inline_tools(self.registry, prompt_config.tools)
        gen_options = await to_generate_action_options(registry, prompt_config)
        return await generate_action(
            registry,
            gen_options,
            middleware=use,
            context=context if context else ActionRunContext._current_context(),  # pyright: ignore[reportPrivateUsage]
        )

    # Overload: output_schema=type[T] -> ModelStreamResponse[T]
    @overload
    def generate_stream(
        self,
        *,
        model: str | None = None,
        prompt: str | list[Part] | None = None,
        system: str | list[Part] | None = None,
        messages: list[Message] | None = None,
        tools: Sequence[str | Tool] | None = None,
        return_tool_requests: bool | None = None,
        tool_choice: ToolChoice | None = None,
        resume_respond: ToolResponsePart | list[ToolResponsePart] | None = None,
        resume_restart: ToolRequestPart | list[ToolRequestPart] | None = None,
        resume_metadata: dict[str, Any] | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        max_turns: int | None = None,
        context: dict[str, object] | None = None,
        output_schema: type[OutputT],
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
        timeout: float | None = None,
    ) -> ModelStreamResponse[OutputT]: ...

    # Overload: no output_schema, dict, or union -> ModelStreamResponse[Any]
    @overload
    def generate_stream(
        self,
        *,
        model: str | None = None,
        prompt: str | list[Part] | None = None,
        system: str | list[Part] | None = None,
        messages: list[Message] | None = None,
        tools: Sequence[str | Tool] | None = None,
        return_tool_requests: bool | None = None,
        tool_choice: ToolChoice | None = None,
        resume_respond: ToolResponsePart | list[ToolResponsePart] | None = None,
        resume_restart: ToolRequestPart | list[ToolRequestPart] | None = None,
        resume_metadata: dict[str, Any] | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        max_turns: int | None = None,
        context: dict[str, object] | None = None,
        output_schema: type | dict | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
        timeout: float | None = None,
    ) -> ModelStreamResponse[Any]: ...

    def generate_stream(
        self,
        *,
        model: str | None = None,
        prompt: str | list[Part] | None = None,
        system: str | list[Part] | None = None,
        messages: list[Message] | None = None,
        tools: Sequence[str | Tool] | None = None,
        return_tool_requests: bool | None = None,
        tool_choice: ToolChoice | None = None,
        resume_respond: ToolResponsePart | list[ToolResponsePart] | None = None,
        resume_restart: ToolRequestPart | list[ToolRequestPart] | None = None,
        resume_metadata: dict[str, Any] | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        max_turns: int | None = None,
        context: dict[str, object] | None = None,
        output_schema: type | dict | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
        timeout: float | None = None,
    ) -> ModelStreamResponse[Any]:
        """Stream generated text, returning a ModelStreamResponse with .stream and .response."""
        channel: Channel[ModelResponseChunk, ModelResponse[Any]] = Channel(timeout=timeout)

        async def _run_generate() -> ModelResponse[Any]:
            prompt_config = PromptConfig(
                model=model,
                prompt=prompt,
                system=system,
                messages=messages,
                tools=tools,
                return_tool_requests=return_tool_requests,
                tool_choice=tool_choice,
                resume_respond=resume_respond,
                resume_restart=resume_restart,
                resume_metadata=resume_metadata,
                config=config,
                max_turns=max_turns,
                output_format=output_format,
                output_content_type=output_content_type,
                output_instructions=output_instructions,
                output_schema=output_schema,
                output_constrained=output_constrained,
                docs=docs,
            )
            registry = await registry_with_inline_tools(self.registry, prompt_config.tools)
            gen_options = await to_generate_action_options(registry, prompt_config)
            return await generate_action(
                registry,
                gen_options,
                on_chunk=lambda c: channel.send(c),
                middleware=use,
                context=context if context else ActionRunContext._current_context(),  # pyright: ignore[reportPrivateUsage]
            )

        response_future: asyncio.Future[ModelResponse[Any]] = asyncio.create_task(_run_generate())
        channel.set_close_future(response_future)

        return ModelStreamResponse[Any](channel=channel, response_future=response_future)

    async def embed(
        self,
        *,
        embedder: str | EmbedderRef | None = None,
        content: str | Document | None = None,
        metadata: dict[str, object] | None = None,
        options: dict[str, object] | None = None,
    ) -> list[Embedding]:
        """Generate vector embeddings for a single document or string."""
        embedder_name = self._resolve_embedder_name(embedder)
        embedder_config: dict[str, object] = {}

        # Extract config and version from EmbedderRef (not done for embed_many per JS behavior)
        if isinstance(embedder, EmbedderRef):
            embedder_config = embedder.config or {}
            if embedder.version:
                embedder_config['version'] = embedder.version  # Handle version from ref

        # Merge options passed to embed() with config from EmbedderRef
        final_options = {**(embedder_config or {}), **(options or {})}

        embed_action = await self.registry.resolve_embedder(embedder_name)
        if embed_action is None:
            raise ValueError(f'Embedder "{embedder_name}" not found')

        if content is None:
            raise ValueError('Content must be specified for embedding.')

        documents = [Document.from_text(content, metadata)] if isinstance(content, str) else [content]

        response = (
            await embed_action.run(
                EmbedRequest(
                    input=documents,  # pyright: ignore[reportArgumentType]
                    options=final_options,
                )
            )
        ).response
        return response.embeddings

    async def embed_many(
        self,
        *,
        embedder: str | EmbedderRef | None = None,
        content: list[str] | list[Document] | None = None,
        metadata: dict[str, object] | None = None,
        options: dict[str, object] | None = None,
    ) -> list[Embedding]:
        """Generate vector embeddings for multiple documents in a single batch call."""
        if content is None:
            raise ValueError('Content must be specified for embedding.')

        # Convert strings to Documents if needed
        documents: list[Document] = [
            Document.from_text(item, metadata) if isinstance(item, str) else item for item in content
        ]

        # Resolve embedder name (JS embedMany does not extract config/version from ref)
        embedder_name = self._resolve_embedder_name(embedder)

        embed_action = await self.registry.resolve_embedder(embedder_name)
        if embed_action is None:
            raise ValueError(f'Embedder "{embedder_name}" not found')

        response = (await embed_action.run(EmbedRequest(input=documents, options=options))).response  # type: ignore[arg-type]
        return response.embeddings

    async def evaluate(
        self,
        evaluator: str | EvaluatorRef | None = None,
        dataset: list[BaseDataPoint] | None = None,
        options: dict[str, object] | None = None,
        eval_run_id: str | None = None,
    ) -> EvalResponse:
        """Evaluate a dataset using the specified evaluator."""
        evaluator_name: str = ''
        evaluator_config: dict[str, object] = {}

        if isinstance(evaluator, EvaluatorRef):
            evaluator_name = evaluator.name
            evaluator_config = evaluator.config_schema or {}
        elif isinstance(evaluator, str):
            evaluator_name = evaluator
        else:
            raise ValueError('Evaluator must be specified as a string name or an EvaluatorRef.')

        final_options = {**(evaluator_config or {}), **(options or {})}

        eval_action = await self.registry.resolve_evaluator(evaluator_name)
        if eval_action is None:
            raise ValueError(f'Evaluator "{evaluator_name}" not found')

        if not eval_run_id:
            eval_run_id = str(uuid.uuid4())

        if dataset is None:
            raise ValueError('Dataset must be specified for evaluation.')

        return (
            await eval_action.run(
                EvalRequest(
                    dataset=dataset,
                    options=final_options,
                    eval_run_id=eval_run_id,
                )
            )
        ).response

    @staticmethod
    def current_context() -> dict[str, Any] | None:
        """Get the current execution context, or None if not in an action."""
        return ActionRunContext._current_context()  # pyright: ignore[reportPrivateUsage]

    async def run(
        self,
        *,
        name: str,
        fn: Callable[[], Awaitable[T]],
        metadata: dict[str, Any] | None = None,
    ) -> T:
        """Run a function as a discrete traced step within a flow."""
        if not inspect.iscoroutinefunction(fn):
            raise TypeError('fn must be a coroutine function')

        span_metadata = SpanMetadata(name=name, type='flowStep', metadata=metadata)
        with run_in_new_span(span_metadata) as span:
            try:
                result = await fn()
                output = (
                    result.model_dump_json(by_alias=True, exclude_none=True)
                    if isinstance(result, BaseModel)
                    else json.dumps(result)
                )
                span.set_attribute('genkit:output', output)
                return result
            except Exception:
                # We catch all exceptions here to ensure they are captured by
                # the trace span context manager before being re-raised.
                # The run_in_new_span context manager handles recording
                # the exception details.
                raise

    async def check_operation(self, operation: Operation) -> Operation:
        """Check the status of a long-running background operation."""
        return await check_operation(self.registry, operation)

    async def cancel_operation(self, operation: Operation) -> Operation:
        """Cancel a long-running background operation."""
        if not operation.action:
            raise ValueError('Provided operation is missing original request information')

        background_action = await lookup_background_action(self.registry, operation.action)
        if background_action is None:
            raise ValueError(f'Failed to resolve background action from original request: {operation.action}')

        return await background_action.cancel(operation)

    async def generate_operation(
        self,
        *,
        model: str | None = None,
        prompt: str | list[Part] | None = None,
        system: str | list[Part] | None = None,
        messages: list[Message] | None = None,
        tools: Sequence[str | Tool] | None = None,
        return_tool_requests: bool | None = None,
        tool_choice: ToolChoice | None = None,
        config: dict[str, object] | ModelConfig | None = None,
        max_turns: int | None = None,
        context: dict[str, object] | None = None,
        output_schema: type | dict | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_constrained: bool | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
    ) -> Operation:
        """Generate content using a long-running model, returning an Operation to poll."""
        # Resolve the model and check for long_running support
        resolved_model = model or cast(str | None, self.registry.lookup_value('defaultModel', 'defaultModel'))
        if not resolved_model:
            raise GenkitError(
                status='INVALID_ARGUMENT',
                message='No model specified for generate_operation.',
            )

        model_action = await self.registry.resolve_action(ActionKind.MODEL, resolved_model)
        if not model_action:
            raise GenkitError(
                status='NOT_FOUND',
                message=f"Model '{resolved_model}' not found.",
            )

        # Check if model supports long-running operations
        if not _model_supports_long_running(model_action):
            raise GenkitError(
                status='INVALID_ARGUMENT',
                message=f"Model '{model_action.name}' does not support long running operations.",
            )

        # Call generate
        response = await self.generate(
            model=model,
            prompt=prompt,
            system=system,
            messages=messages,
            tools=tools,
            return_tool_requests=return_tool_requests,
            tool_choice=tool_choice,
            config=config,
            max_turns=max_turns,
            context=context,
            output_schema=output_schema,
            output_format=output_format,
            output_content_type=output_content_type,
            output_instructions=output_instructions,
            output_constrained=output_constrained,
            use=use,
            docs=docs,
        )

        # Extract operation from response
        if not hasattr(response, 'operation') or not response.operation:
            raise GenkitError(
                status='FAILED_PRECONDITION',
                message=f"Model '{model_action.name}' did not return an operation.",
            )

        return response.operation

cancel_operation(operation) async

Cancel a long-running background operation.

Source code in packages/genkit/src/genkit/_ai/_aio.py
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
async def cancel_operation(self, operation: Operation) -> Operation:
    """Cancel a long-running background operation."""
    if not operation.action:
        raise ValueError('Provided operation is missing original request information')

    background_action = await lookup_background_action(self.registry, operation.action)
    if background_action is None:
        raise ValueError(f'Failed to resolve background action from original request: {operation.action}')

    return await background_action.cancel(operation)

check_operation(operation) async

Check the status of a long-running background operation.

Source code in packages/genkit/src/genkit/_ai/_aio.py
1154
1155
1156
async def check_operation(self, operation: Operation) -> Operation:
    """Check the status of a long-running background operation."""
    return await check_operation(self.registry, operation)

current_context() staticmethod

Get the current execution context, or None if not in an action.

Source code in packages/genkit/src/genkit/_ai/_aio.py
1120
1121
1122
1123
@staticmethod
def current_context() -> dict[str, Any] | None:
    """Get the current execution context, or None if not in an action."""
    return ActionRunContext._current_context()  # pyright: ignore[reportPrivateUsage]

define_background_model(name, start, check, cancel=None, label=None, info=None, config_schema=None, metadata=None, description=None)

Register a background model for long-running AI operations.

Source code in packages/genkit/src/genkit/_ai/_aio.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def define_background_model(
    self,
    name: str,
    start: StartModelOpFn,
    check: CheckModelOpFn,
    cancel: CancelModelOpFn | None = None,
    label: str | None = None,
    info: ModelInfo | None = None,
    config_schema: type[BaseModel] | dict[str, object] | None = None,
    metadata: dict[str, object] | None = None,
    description: str | None = None,
) -> BackgroundAction:
    """Register a background model for long-running AI operations."""
    return define_background_model(
        registry=self.registry,
        name=name,
        start=start,
        check=check,
        cancel=cancel,
        label=label,
        info=info,
        config_schema=config_schema,
        metadata=metadata,
        description=description,
    )

define_batch_evaluator(*, name, display_name, definition, fn, is_billed=False, config_schema=None, metadata=None, description=None)

Register a batch evaluator action.

Source code in packages/genkit/src/genkit/_ai/_aio.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def define_batch_evaluator(
    self,
    *,
    name: str,
    display_name: str,
    definition: str,
    fn: BatchEvaluatorFn[Any],
    is_billed: bool = False,
    config_schema: type[BaseModel] | dict[str, object] | None = None,
    metadata: dict[str, object] | None = None,
    description: str | None = None,
) -> Action:
    """Register a batch evaluator action."""
    return define_batch_evaluator(
        self.registry,
        name=name,
        display_name=display_name,
        definition=definition,
        fn=fn,
        is_billed=is_billed,
        config_schema=config_schema,
        metadata=metadata,
        description=description,
    )

define_dynamic_action_provider(name, fn, *, description=None, cache_ttl_millis=None, metadata=None)

Register a Dynamic Action Provider (DAP).

Source code in packages/genkit/src/genkit/_ai/_aio.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def define_dynamic_action_provider(
    self,
    name: str,
    fn: DapFn,
    *,
    description: str | None = None,
    cache_ttl_millis: int | None = None,
    metadata: dict[str, Any] | None = None,
) -> DynamicActionProvider:
    """Register a Dynamic Action Provider (DAP)."""
    return define_dap_block(
        self.registry,
        name,
        fn,
        description=description,
        cache_ttl_millis=cache_ttl_millis,
        metadata=metadata,
    )

define_embedder(name, fn, options=None, metadata=None, description=None)

Register a custom embedder action.

Source code in packages/genkit/src/genkit/_ai/_aio.py
391
392
393
394
395
396
397
398
399
400
def define_embedder(
    self,
    name: str,
    fn: EmbedderFn,
    options: EmbedderOptions | None = None,
    metadata: dict[str, object] | None = None,
    description: str | None = None,
) -> Action:
    """Register a custom embedder action."""
    return define_embedder(self.registry, name, fn, options, metadata, description)

define_evaluator(*, name, display_name, definition, fn, is_billed=False, config_schema=None, metadata=None, description=None)

Register an evaluator action.

Source code in packages/genkit/src/genkit/_ai/_aio.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def define_evaluator(
    self,
    *,
    name: str,
    display_name: str,
    definition: str,
    fn: EvaluatorFn[Any],
    is_billed: bool = False,
    config_schema: type[BaseModel] | dict[str, object] | None = None,
    metadata: dict[str, object] | None = None,
    description: str | None = None,
) -> Action:
    """Register an evaluator action."""
    return define_evaluator(
        self.registry,
        name=name,
        display_name=display_name,
        definition=definition,
        fn=fn,
        is_billed=is_billed,
        config_schema=config_schema,
        metadata=metadata,
        description=description,
    )

define_format(format)

Register a custom output format.

Source code in packages/genkit/src/genkit/_ai/_aio.py
402
403
404
def define_format(self, format: FormatDef) -> None:
    """Register a custom output format."""
    self.registry.register_value('format', format.name, format)

define_helper(name, fn)

Register a Handlebars helper function.

Source code in packages/genkit/src/genkit/_ai/_aio.py
227
228
229
def define_helper(self, name: str, fn: Callable[..., Any]) -> None:
    """Register a Handlebars helper function."""
    define_helper(self.registry, name, fn)

define_interrupt(name, *, input_schema=None, description=None)

Register an interrupt tool that always pauses for user input.

Parameters:

Name Type Description Default
name str

Tool name

required
input_schema type[BaseModel] | dict[str, object] | None

Optional input schema (Pydantic model or JSON schema dict)

None
description str | None

Tool description

None

Returns:

Type Description
Tool

The registered interrupt tool

Example

ask_user = ai.define_interrupt( name='ask_user', input_schema=Question, description='Ask the user a question', )

Source code in packages/genkit/src/genkit/_ai/_aio.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def define_interrupt(
    self,
    name: str,
    *,
    input_schema: type[BaseModel] | dict[str, object] | None = None,
    description: str | None = None,
) -> Tool:
    """Register an interrupt tool that always pauses for user input.

    Args:
        name: Tool name
        input_schema: Optional input schema (Pydantic model or JSON schema dict)
        description: Tool description

    Returns:
        The registered interrupt tool

    Example:
        ask_user = ai.define_interrupt(
            name='ask_user',
            input_schema=Question,
            description='Ask the user a question',
        )
    """
    return define_interrupt(
        self.registry,
        name,
        description=description,
        input_schema=input_schema,
    )

define_json_schema(name, json_schema)

Register a JSON schema for use in prompts.

Source code in packages/genkit/src/genkit/_ai/_aio.py
240
241
242
243
def define_json_schema(self, name: str, json_schema: dict[str, object]) -> dict[str, object]:
    """Register a JSON schema for use in prompts."""
    self.registry.register_schema(name, json_schema)
    return json_schema

define_model(name, fn, config_schema=None, metadata=None, info=None, description=None)

Register a custom model action.

Source code in packages/genkit/src/genkit/_ai/_aio.py
353
354
355
356
357
358
359
360
361
362
363
def define_model(
    self,
    name: str,
    fn: ModelFn,
    config_schema: type[BaseModel] | dict[str, object] | None = None,
    metadata: dict[str, object] | None = None,
    info: ModelInfo | None = None,
    description: str | None = None,
) -> Action:
    """Register a custom model action."""
    return define_model(self.registry, name, fn, config_schema, metadata, info, description)

define_partial(name, source)

Register a Handlebars partial template.

Source code in packages/genkit/src/genkit/_ai/_aio.py
231
232
233
def define_partial(self, name: str, source: str) -> None:
    """Register a Handlebars partial template."""
    define_partial(self.registry, name, source)

define_prompt(name=None, *, variant=None, model=None, config=None, description=None, system=None, prompt=None, messages=None, output_format=None, output_content_type=None, output_instructions=None, output_constrained=None, max_turns=None, return_tool_requests=None, metadata=None, tools=None, tool_choice=None, use=None, docs=None, input_schema=None, output_schema=None)

define_prompt(
    name: str | None = None,
    *,
    variant: str | None = None,
    model: str | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    description: str | None = None,
    system: str | list[Part] | None = None,
    prompt: str | list[Part] | None = None,
    messages: str | list[Message] | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    max_turns: int | None = None,
    return_tool_requests: bool | None = None,
    metadata: dict[str, object] | None = None,
    tools: Sequence[str | Tool] | None = None,
    tool_choice: ToolChoice | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None,
    input_schema: type[InputT],
    output_schema: type[OutputT]
) -> ExecutablePrompt[InputT, OutputT]
define_prompt(
    name: str | None = None,
    *,
    variant: str | None = None,
    model: str | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    description: str | None = None,
    system: str | list[Part] | None = None,
    prompt: str | list[Part] | None = None,
    messages: str | list[Message] | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    max_turns: int | None = None,
    return_tool_requests: bool | None = None,
    metadata: dict[str, object] | None = None,
    tools: Sequence[str | Tool] | None = None,
    tool_choice: ToolChoice | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None,
    input_schema: type[InputT],
    output_schema: dict[str, object] | str | None = None
) -> ExecutablePrompt[InputT, Any]
define_prompt(
    name: str | None = None,
    *,
    variant: str | None = None,
    model: str | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    description: str | None = None,
    system: str | list[Part] | None = None,
    prompt: str | list[Part] | None = None,
    messages: str | list[Message] | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    max_turns: int | None = None,
    return_tool_requests: bool | None = None,
    metadata: dict[str, object] | None = None,
    tools: Sequence[str | Tool] | None = None,
    tool_choice: ToolChoice | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None,
    input_schema: dict[str, object] | str | None = None,
    output_schema: type[OutputT]
) -> ExecutablePrompt[Any, OutputT]
define_prompt(
    name: str | None = None,
    *,
    variant: str | None = None,
    model: str | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    description: str | None = None,
    system: str | list[Part] | None = None,
    prompt: str | list[Part] | None = None,
    messages: str | list[Message] | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    max_turns: int | None = None,
    return_tool_requests: bool | None = None,
    metadata: dict[str, object] | None = None,
    tools: Sequence[str | Tool] | None = None,
    tool_choice: ToolChoice | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None,
    input_schema: dict[str, object] | str | None = None,
    output_schema: dict[str, object] | str | None = None
) -> ExecutablePrompt[Any, Any]

Register a prompt template.

Source code in packages/genkit/src/genkit/_ai/_aio.py
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def define_prompt(
    self,
    name: str | None = None,
    *,
    variant: str | None = None,
    model: str | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    description: str | None = None,
    system: str | list[Part] | None = None,
    prompt: str | list[Part] | None = None,
    messages: str | list[Message] | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    max_turns: int | None = None,
    return_tool_requests: bool | None = None,
    metadata: dict[str, object] | None = None,
    tools: Sequence[str | Tool] | None = None,
    tool_choice: ToolChoice | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None,
    input_schema: type | dict[str, object] | str | None = None,
    output_schema: type | dict[str, object] | str | None = None,
) -> ExecutablePrompt[Any, Any]:
    """Register a prompt template."""
    executable_prompt = ExecutablePrompt(
        self.registry,
        variant=variant,
        model=model,
        config=config,
        description=description,
        input_schema=input_schema,
        system=system,
        prompt=prompt,
        messages=messages,
        output_format=output_format,
        output_content_type=output_content_type,
        output_instructions=output_instructions,
        output_schema=output_schema,
        output_constrained=output_constrained,
        max_turns=max_turns,
        return_tool_requests=return_tool_requests,
        metadata=metadata,
        tools=tools,
        tool_choice=tool_choice,
        use=use,
        docs=docs,
        name=name,
    )
    if name:
        register_prompt_actions(self.registry, executable_prompt, name, variant)
    return executable_prompt

define_resource(*, fn, name=None, uri=None, template=None, description=None, metadata=None)

Register a resource action.

Source code in packages/genkit/src/genkit/_ai/_aio.py
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
def define_resource(
    self,
    *,
    fn: ResourceFn,
    name: str | None = None,
    uri: str | None = None,
    template: str | None = None,
    description: str | None = None,
    metadata: dict[str, object] | None = None,
) -> Action:
    """Register a resource action."""
    opts: ResourceOptions = {}
    if name:
        opts['name'] = name
    if uri:
        opts['uri'] = uri
    if template:
        opts['template'] = template
    if description:
        opts['description'] = description
    if metadata:
        opts['metadata'] = metadata

    return define_resource(self.registry, opts, fn)

define_schema(name, schema)

Register a Pydantic schema for use in prompts.

Source code in packages/genkit/src/genkit/_ai/_aio.py
235
236
237
238
def define_schema(self, name: str, schema: type[BaseModel]) -> type[BaseModel]:
    """Register a Pydantic schema for use in prompts."""
    define_schema(self.registry, name, schema)
    return schema

embed(*, embedder=None, content=None, metadata=None, options=None) async

Generate vector embeddings for a single document or string.

Source code in packages/genkit/src/genkit/_ai/_aio.py
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
async def embed(
    self,
    *,
    embedder: str | EmbedderRef | None = None,
    content: str | Document | None = None,
    metadata: dict[str, object] | None = None,
    options: dict[str, object] | None = None,
) -> list[Embedding]:
    """Generate vector embeddings for a single document or string."""
    embedder_name = self._resolve_embedder_name(embedder)
    embedder_config: dict[str, object] = {}

    # Extract config and version from EmbedderRef (not done for embed_many per JS behavior)
    if isinstance(embedder, EmbedderRef):
        embedder_config = embedder.config or {}
        if embedder.version:
            embedder_config['version'] = embedder.version  # Handle version from ref

    # Merge options passed to embed() with config from EmbedderRef
    final_options = {**(embedder_config or {}), **(options or {})}

    embed_action = await self.registry.resolve_embedder(embedder_name)
    if embed_action is None:
        raise ValueError(f'Embedder "{embedder_name}" not found')

    if content is None:
        raise ValueError('Content must be specified for embedding.')

    documents = [Document.from_text(content, metadata)] if isinstance(content, str) else [content]

    response = (
        await embed_action.run(
            EmbedRequest(
                input=documents,  # pyright: ignore[reportArgumentType]
                options=final_options,
            )
        )
    ).response
    return response.embeddings

embed_many(*, embedder=None, content=None, metadata=None, options=None) async

Generate vector embeddings for multiple documents in a single batch call.

Source code in packages/genkit/src/genkit/_ai/_aio.py
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
async def embed_many(
    self,
    *,
    embedder: str | EmbedderRef | None = None,
    content: list[str] | list[Document] | None = None,
    metadata: dict[str, object] | None = None,
    options: dict[str, object] | None = None,
) -> list[Embedding]:
    """Generate vector embeddings for multiple documents in a single batch call."""
    if content is None:
        raise ValueError('Content must be specified for embedding.')

    # Convert strings to Documents if needed
    documents: list[Document] = [
        Document.from_text(item, metadata) if isinstance(item, str) else item for item in content
    ]

    # Resolve embedder name (JS embedMany does not extract config/version from ref)
    embedder_name = self._resolve_embedder_name(embedder)

    embed_action = await self.registry.resolve_embedder(embedder_name)
    if embed_action is None:
        raise ValueError(f'Embedder "{embedder_name}" not found')

    response = (await embed_action.run(EmbedRequest(input=documents, options=options))).response  # type: ignore[arg-type]
    return response.embeddings

evaluate(evaluator=None, dataset=None, options=None, eval_run_id=None) async

Evaluate a dataset using the specified evaluator.

Source code in packages/genkit/src/genkit/_ai/_aio.py
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
async def evaluate(
    self,
    evaluator: str | EvaluatorRef | None = None,
    dataset: list[BaseDataPoint] | None = None,
    options: dict[str, object] | None = None,
    eval_run_id: str | None = None,
) -> EvalResponse:
    """Evaluate a dataset using the specified evaluator."""
    evaluator_name: str = ''
    evaluator_config: dict[str, object] = {}

    if isinstance(evaluator, EvaluatorRef):
        evaluator_name = evaluator.name
        evaluator_config = evaluator.config_schema or {}
    elif isinstance(evaluator, str):
        evaluator_name = evaluator
    else:
        raise ValueError('Evaluator must be specified as a string name or an EvaluatorRef.')

    final_options = {**(evaluator_config or {}), **(options or {})}

    eval_action = await self.registry.resolve_evaluator(evaluator_name)
    if eval_action is None:
        raise ValueError(f'Evaluator "{evaluator_name}" not found')

    if not eval_run_id:
        eval_run_id = str(uuid.uuid4())

    if dataset is None:
        raise ValueError('Dataset must be specified for evaluation.')

    return (
        await eval_action.run(
            EvalRequest(
                dataset=dataset,
                options=final_options,
                eval_run_id=eval_run_id,
            )
        )
    ).response

flow(name=None, *, description=None, chunk_type=None)

flow(
    name: str | None = None,
    *,
    description: str | None = None,
    chunk_type: None = None
) -> _FlowDecorator
flow(
    name: str | None = None,
    *,
    description: str | None = None,
    chunk_type: type[ChunkT]
) -> _FlowDecoratorWithChunk[ChunkT]

Decorator to register an async function as a flow.

Parameters:

Name Type Description Default
name str | None

Optional name for the flow. Defaults to the function name.

None
description str | None

Optional description for the flow.

None
chunk_type type[Any] | None

Optional type for streaming chunks. When provided, the returned Action will be typed as Action[InputT, OutputT, ChunkT].

None
Example

@ai.flow() async def my_flow(x: str) -> int: ... # Action[str, int]

@ai.flow(chunk_type=str) async def streaming_flow(x: int, ctx: ActionRunContext) -> str: ctx.send_chunk("progress") return "done"

Action[int, str, str]

Source code in packages/genkit/src/genkit/_ai/_aio.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def flow(
    self,
    name: str | None = None,
    *,
    description: str | None = None,
    chunk_type: type[Any] | None = None,
) -> _FlowDecorator | _FlowDecoratorWithChunk[Any]:
    """Decorator to register an async function as a flow.

    Args:
        name: Optional name for the flow. Defaults to the function name.
        description: Optional description for the flow.
        chunk_type: Optional type for streaming chunks. When provided,
            the returned Action will be typed as Action[InputT, OutputT, ChunkT].

    Example:
        @ai.flow()
        async def my_flow(x: str) -> int: ...  # Action[str, int]

        @ai.flow(chunk_type=str)
        async def streaming_flow(x: int, ctx: ActionRunContext) -> str:
            ctx.send_chunk("progress")
            return "done"
        # Action[int, str, str]
    """
    if chunk_type is not None:
        return _FlowDecoratorWithChunk(self.registry, name, description, chunk_type)
    return _FlowDecorator(self.registry, name, description)

generate(*, model=None, prompt=None, system=None, messages=None, tools=None, return_tool_requests=None, tool_choice=None, resume_respond=None, resume_restart=None, resume_metadata=None, config=None, max_turns=None, context=None, output_schema=None, output_format=None, output_content_type=None, output_instructions=None, output_constrained=None, use=None, docs=None) async

generate(
    *,
    model: str | None = None,
    prompt: str | list[Part] | None = None,
    system: str | list[Part] | None = None,
    messages: list[Message] | None = None,
    tools: Sequence[str | Tool] | None = None,
    return_tool_requests: bool | None = None,
    tool_choice: ToolChoice | None = None,
    resume_respond: (
        ToolResponsePart | list[ToolResponsePart] | None
    ) = None,
    resume_restart: (
        ToolRequestPart | list[ToolRequestPart] | None
    ) = None,
    resume_metadata: dict[str, Any] | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    max_turns: int | None = None,
    context: dict[str, object] | None = None,
    output_schema: type[OutputT],
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None
) -> ModelResponse[OutputT]
generate(
    *,
    model: str | None = None,
    prompt: str | list[Part] | None = None,
    system: str | list[Part] | None = None,
    messages: list[Message] | None = None,
    tools: Sequence[str | Tool] | None = None,
    return_tool_requests: bool | None = None,
    tool_choice: ToolChoice | None = None,
    resume_respond: (
        ToolResponsePart | list[ToolResponsePart] | None
    ) = None,
    resume_restart: (
        ToolRequestPart | list[ToolRequestPart] | None
    ) = None,
    resume_metadata: dict[str, Any] | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    max_turns: int | None = None,
    context: dict[str, object] | None = None,
    output_schema: type | dict | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None
) -> ModelResponse[Any]

Generate text or structured data using a language model.

tools is typed as Sequence rather than list because Sequence is covariant: list[Tool] or list[str] are both assignable to Sequence[str | Tool], but not to list[str | Tool].

Source code in packages/genkit/src/genkit/_ai/_aio.py
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
async def generate(
    self,
    *,
    model: str | None = None,
    prompt: str | list[Part] | None = None,
    system: str | list[Part] | None = None,
    messages: list[Message] | None = None,
    tools: Sequence[str | Tool] | None = None,
    return_tool_requests: bool | None = None,
    tool_choice: ToolChoice | None = None,
    resume_respond: ToolResponsePart | list[ToolResponsePart] | None = None,
    resume_restart: ToolRequestPart | list[ToolRequestPart] | None = None,
    resume_metadata: dict[str, Any] | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    max_turns: int | None = None,
    context: dict[str, object] | None = None,
    output_schema: type | dict | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None,
) -> ModelResponse[Any]:
    """Generate text or structured data using a language model.

    ``tools`` is typed as ``Sequence`` rather than ``list`` because ``Sequence``
    is covariant: ``list[Tool]`` or ``list[str]`` are both assignable to
    ``Sequence[str | Tool]``, but not to ``list[str | Tool]``.
    """
    prompt_config = PromptConfig(
        model=model,
        prompt=prompt,
        system=system,
        messages=messages,
        tools=tools,
        return_tool_requests=return_tool_requests,
        tool_choice=tool_choice,
        resume_respond=resume_respond,
        resume_restart=resume_restart,
        resume_metadata=resume_metadata,
        config=config,
        max_turns=max_turns,
        output_format=output_format,
        output_content_type=output_content_type,
        output_instructions=output_instructions,
        output_schema=output_schema,
        output_constrained=output_constrained,
        docs=docs,
    )
    registry = await registry_with_inline_tools(self.registry, prompt_config.tools)
    gen_options = await to_generate_action_options(registry, prompt_config)
    return await generate_action(
        registry,
        gen_options,
        middleware=use,
        context=context if context else ActionRunContext._current_context(),  # pyright: ignore[reportPrivateUsage]
    )

generate_operation(*, model=None, prompt=None, system=None, messages=None, tools=None, return_tool_requests=None, tool_choice=None, config=None, max_turns=None, context=None, output_schema=None, output_format=None, output_content_type=None, output_instructions=None, output_constrained=None, use=None, docs=None) async

Generate content using a long-running model, returning an Operation to poll.

Source code in packages/genkit/src/genkit/_ai/_aio.py
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
async def generate_operation(
    self,
    *,
    model: str | None = None,
    prompt: str | list[Part] | None = None,
    system: str | list[Part] | None = None,
    messages: list[Message] | None = None,
    tools: Sequence[str | Tool] | None = None,
    return_tool_requests: bool | None = None,
    tool_choice: ToolChoice | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    max_turns: int | None = None,
    context: dict[str, object] | None = None,
    output_schema: type | dict | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None,
) -> Operation:
    """Generate content using a long-running model, returning an Operation to poll."""
    # Resolve the model and check for long_running support
    resolved_model = model or cast(str | None, self.registry.lookup_value('defaultModel', 'defaultModel'))
    if not resolved_model:
        raise GenkitError(
            status='INVALID_ARGUMENT',
            message='No model specified for generate_operation.',
        )

    model_action = await self.registry.resolve_action(ActionKind.MODEL, resolved_model)
    if not model_action:
        raise GenkitError(
            status='NOT_FOUND',
            message=f"Model '{resolved_model}' not found.",
        )

    # Check if model supports long-running operations
    if not _model_supports_long_running(model_action):
        raise GenkitError(
            status='INVALID_ARGUMENT',
            message=f"Model '{model_action.name}' does not support long running operations.",
        )

    # Call generate
    response = await self.generate(
        model=model,
        prompt=prompt,
        system=system,
        messages=messages,
        tools=tools,
        return_tool_requests=return_tool_requests,
        tool_choice=tool_choice,
        config=config,
        max_turns=max_turns,
        context=context,
        output_schema=output_schema,
        output_format=output_format,
        output_content_type=output_content_type,
        output_instructions=output_instructions,
        output_constrained=output_constrained,
        use=use,
        docs=docs,
    )

    # Extract operation from response
    if not hasattr(response, 'operation') or not response.operation:
        raise GenkitError(
            status='FAILED_PRECONDITION',
            message=f"Model '{model_action.name}' did not return an operation.",
        )

    return response.operation

generate_stream(*, model=None, prompt=None, system=None, messages=None, tools=None, return_tool_requests=None, tool_choice=None, resume_respond=None, resume_restart=None, resume_metadata=None, config=None, max_turns=None, context=None, output_schema=None, output_format=None, output_content_type=None, output_instructions=None, output_constrained=None, use=None, docs=None, timeout=None)

generate_stream(
    *,
    model: str | None = None,
    prompt: str | list[Part] | None = None,
    system: str | list[Part] | None = None,
    messages: list[Message] | None = None,
    tools: Sequence[str | Tool] | None = None,
    return_tool_requests: bool | None = None,
    tool_choice: ToolChoice | None = None,
    resume_respond: (
        ToolResponsePart | list[ToolResponsePart] | None
    ) = None,
    resume_restart: (
        ToolRequestPart | list[ToolRequestPart] | None
    ) = None,
    resume_metadata: dict[str, Any] | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    max_turns: int | None = None,
    context: dict[str, object] | None = None,
    output_schema: type[OutputT],
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None,
    timeout: float | None = None
) -> ModelStreamResponse[OutputT]
generate_stream(
    *,
    model: str | None = None,
    prompt: str | list[Part] | None = None,
    system: str | list[Part] | None = None,
    messages: list[Message] | None = None,
    tools: Sequence[str | Tool] | None = None,
    return_tool_requests: bool | None = None,
    tool_choice: ToolChoice | None = None,
    resume_respond: (
        ToolResponsePart | list[ToolResponsePart] | None
    ) = None,
    resume_restart: (
        ToolRequestPart | list[ToolRequestPart] | None
    ) = None,
    resume_metadata: dict[str, Any] | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    max_turns: int | None = None,
    context: dict[str, object] | None = None,
    output_schema: type | dict | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None,
    timeout: float | None = None
) -> ModelStreamResponse[Any]

Stream generated text, returning a ModelStreamResponse with .stream and .response.

Source code in packages/genkit/src/genkit/_ai/_aio.py
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
def generate_stream(
    self,
    *,
    model: str | None = None,
    prompt: str | list[Part] | None = None,
    system: str | list[Part] | None = None,
    messages: list[Message] | None = None,
    tools: Sequence[str | Tool] | None = None,
    return_tool_requests: bool | None = None,
    tool_choice: ToolChoice | None = None,
    resume_respond: ToolResponsePart | list[ToolResponsePart] | None = None,
    resume_restart: ToolRequestPart | list[ToolRequestPart] | None = None,
    resume_metadata: dict[str, Any] | None = None,
    config: dict[str, object] | ModelConfig | None = None,
    max_turns: int | None = None,
    context: dict[str, object] | None = None,
    output_schema: type | dict | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_constrained: bool | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None,
    timeout: float | None = None,
) -> ModelStreamResponse[Any]:
    """Stream generated text, returning a ModelStreamResponse with .stream and .response."""
    channel: Channel[ModelResponseChunk, ModelResponse[Any]] = Channel(timeout=timeout)

    async def _run_generate() -> ModelResponse[Any]:
        prompt_config = PromptConfig(
            model=model,
            prompt=prompt,
            system=system,
            messages=messages,
            tools=tools,
            return_tool_requests=return_tool_requests,
            tool_choice=tool_choice,
            resume_respond=resume_respond,
            resume_restart=resume_restart,
            resume_metadata=resume_metadata,
            config=config,
            max_turns=max_turns,
            output_format=output_format,
            output_content_type=output_content_type,
            output_instructions=output_instructions,
            output_schema=output_schema,
            output_constrained=output_constrained,
            docs=docs,
        )
        registry = await registry_with_inline_tools(self.registry, prompt_config.tools)
        gen_options = await to_generate_action_options(registry, prompt_config)
        return await generate_action(
            registry,
            gen_options,
            on_chunk=lambda c: channel.send(c),
            middleware=use,
            context=context if context else ActionRunContext._current_context(),  # pyright: ignore[reportPrivateUsage]
        )

    response_future: asyncio.Future[ModelResponse[Any]] = asyncio.create_task(_run_generate())
    channel.set_close_future(response_future)

    return ModelStreamResponse[Any](channel=channel, response_future=response_future)

prompt(name, *, variant=None, input_schema=None, output_schema=None)

prompt(
    name: str,
    *,
    variant: str | None = None,
    input_schema: None = None,
    output_schema: None = None
) -> ExecutablePrompt[Any, Any]
prompt(
    name: str,
    *,
    variant: str | None = None,
    input_schema: type[InputT],
    output_schema: None = None
) -> ExecutablePrompt[InputT, Any]
prompt(
    name: str,
    *,
    variant: str | None = None,
    input_schema: None = None,
    output_schema: type[OutputT]
) -> ExecutablePrompt[Any, OutputT]
prompt(
    name: str,
    *,
    variant: str | None = None,
    input_schema: type[InputT],
    output_schema: type[OutputT]
) -> ExecutablePrompt[InputT, OutputT]

Look up a prompt by name and optional variant.

Source code in packages/genkit/src/genkit/_ai/_aio.py
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
def prompt(
    self,
    name: str,
    *,
    variant: str | None = None,
    input_schema: type[InputT] | None = None,
    output_schema: type[OutputT] | None = None,
) -> ExecutablePrompt[InputT, OutputT] | ExecutablePrompt[Any, Any]:
    """Look up a prompt by name and optional variant."""
    return ExecutablePrompt(
        registry=self.registry,
        name=name,
        variant=variant,
        input_schema=input_schema,
        output_schema=output_schema,
    )

run(*, name, fn, metadata=None) async

Run a function as a discrete traced step within a flow.

Source code in packages/genkit/src/genkit/_ai/_aio.py
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
async def run(
    self,
    *,
    name: str,
    fn: Callable[[], Awaitable[T]],
    metadata: dict[str, Any] | None = None,
) -> T:
    """Run a function as a discrete traced step within a flow."""
    if not inspect.iscoroutinefunction(fn):
        raise TypeError('fn must be a coroutine function')

    span_metadata = SpanMetadata(name=name, type='flowStep', metadata=metadata)
    with run_in_new_span(span_metadata) as span:
        try:
            result = await fn()
            output = (
                result.model_dump_json(by_alias=True, exclude_none=True)
                if isinstance(result, BaseModel)
                else json.dumps(result)
            )
            span.set_attribute('genkit:output', output)
            return result
        except Exception:
            # We catch all exceptions here to ensure they are captured by
            # the trace span context manager before being re-raised.
            # The run_in_new_span context manager handles recording
            # the exception details.
            raise

run_main(coro)

Run the user's main coroutine, blocking in dev mode for the reflection server.

Source code in packages/genkit/src/genkit/_ai/_aio.py
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
def run_main(self, coro: Coroutine[Any, Any, T]) -> T | None:
    """Run the user's main coroutine, blocking in dev mode for the reflection server."""
    if not is_dev_environment():
        logger.info('Running in production mode.')
        return run_loop(coro)

    logger.info('Running in development mode.')

    async def dev_runner() -> T | None:
        user_result: T | None = None
        try:
            user_result = await coro
            logger.debug('User coroutine completed successfully.')
        except Exception:
            logger.exception('User coroutine failed')

        # Block until Ctrl+C (SIGINT handled by anyio) or SIGTERM, keeping
        # the daemon reflection thread alive.
        logger.info('Script done — Dev UI running. Press Ctrl+C to stop.')
        try:
            async with anyio.create_task_group() as tg:

                async def _handle_sigterm(tg_: anyio.abc.TaskGroup) -> None:  # type: ignore[name-defined]
                    with anyio.open_signal_receiver(signal.SIGTERM) as sigs:
                        async for _ in sigs:
                            tg_.cancel_scope.cancel()
                            return

                tg.start_soon(_handle_sigterm, tg)
                await anyio.sleep_forever()
        except anyio.get_cancelled_exc_class():
            pass

        logger.info('Dev UI server stopped.')
        return user_result

    return anyio.run(dev_runner)

tool(name=None, description=None)

Decorator to register a function as a tool.

Source code in packages/genkit/src/genkit/_ai/_aio.py
264
265
266
267
268
269
270
def tool(self, name: str | None = None, description: str | None = None) -> Callable[[Callable[..., Any]], Tool]:
    """Decorator to register a function as a tool."""

    def wrapper(func: Callable[..., Any]) -> Tool:
        return define_tool(self.registry, func, name, description)

    return wrapper

genkit.Plugin

Bases: ABC

Abstract base class for Genkit plugins.

Source code in packages/genkit/src/genkit/_core/_plugin.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class Plugin(abc.ABC):
    """Abstract base class for Genkit plugins."""

    name: str  # plugin namespace

    @abc.abstractmethod
    async def init(self) -> list[Action]:
        """Lazy warm-up called once per plugin; return actions to pre-register."""
        ...

    @abc.abstractmethod
    async def resolve(self, action_type: ActionKind, name: str) -> Action | None:
        """Resolve a single action by kind and namespaced name."""
        ...

    @abc.abstractmethod
    async def list_actions(self) -> list[ActionMetadata]:
        """Return advertised actions for dev UI/reflection listing.

        ``ActionMetadata.action_type`` must be set (typically ``ActionKind.*``) and
        ``ActionMetadata.name`` must match resolution keys (typically
        ``{plugin.name}/localName`` for plugin-backed actions).
        """
        ...

    async def model(self, name: str) -> Action | None:
        """Resolve a model action by name (local or namespaced)."""
        target = name if '/' in name else f'{self.name}/{name}'
        return await self.resolve(ActionKind.MODEL, target)

    async def embedder(self, name: str) -> Action | None:
        """Resolve an embedder action by name (local or namespaced)."""
        target = name if '/' in name else f'{self.name}/{name}'
        return await self.resolve(ActionKind.EMBEDDER, target)

embedder(name) async

Resolve an embedder action by name (local or namespaced).

Source code in packages/genkit/src/genkit/_core/_plugin.py
55
56
57
58
async def embedder(self, name: str) -> Action | None:
    """Resolve an embedder action by name (local or namespaced)."""
    target = name if '/' in name else f'{self.name}/{name}'
    return await self.resolve(ActionKind.EMBEDDER, target)

init() abstractmethod async

Lazy warm-up called once per plugin; return actions to pre-register.

Source code in packages/genkit/src/genkit/_core/_plugin.py
30
31
32
33
@abc.abstractmethod
async def init(self) -> list[Action]:
    """Lazy warm-up called once per plugin; return actions to pre-register."""
    ...

list_actions() abstractmethod async

Return advertised actions for dev UI/reflection listing.

ActionMetadata.action_type must be set (typically ActionKind.*) and ActionMetadata.name must match resolution keys (typically {plugin.name}/localName for plugin-backed actions).

Source code in packages/genkit/src/genkit/_core/_plugin.py
40
41
42
43
44
45
46
47
48
@abc.abstractmethod
async def list_actions(self) -> list[ActionMetadata]:
    """Return advertised actions for dev UI/reflection listing.

    ``ActionMetadata.action_type`` must be set (typically ``ActionKind.*``) and
    ``ActionMetadata.name`` must match resolution keys (typically
    ``{plugin.name}/localName`` for plugin-backed actions).
    """
    ...

model(name) async

Resolve a model action by name (local or namespaced).

Source code in packages/genkit/src/genkit/_core/_plugin.py
50
51
52
53
async def model(self, name: str) -> Action | None:
    """Resolve a model action by name (local or namespaced)."""
    target = name if '/' in name else f'{self.name}/{name}'
    return await self.resolve(ActionKind.MODEL, target)

resolve(action_type, name) abstractmethod async

Resolve a single action by kind and namespaced name.

Source code in packages/genkit/src/genkit/_core/_plugin.py
35
36
37
38
@abc.abstractmethod
async def resolve(self, action_type: ActionKind, name: str) -> Action | None:
    """Resolve a single action by kind and namespaced name."""
    ...

genkit.Action

Bases: Generic[InputT, OutputT, ChunkT]

A named, traced, remotely callable function.

Source code in packages/genkit/src/genkit/_core/_action.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
class Action(Generic[InputT, OutputT, ChunkT]):
    """A named, traced, remotely callable function."""

    def __init__(
        self,
        kind: ActionKind,
        name: str,
        fn: Callable[..., Awaitable[OutputT]],
        metadata_fn: Callable[..., object] | None = None,
        description: str | None = None,
        metadata: dict[str, object] | None = None,
        span_metadata: dict[str, SpanAttributeValue] | None = None,
    ) -> None:
        self._kind: ActionKind = kind
        self._name: str = name
        self._metadata: dict[str, object] = metadata if metadata else {}
        self._description: str | None = description
        self._span_metadata: dict[str, SpanAttributeValue] = span_metadata or {}
        # Optional matcher function for resource actions
        self.matches: Callable[[object], bool] | None = None

        # All action handlers must be async
        if not inspect.iscoroutinefunction(fn):
            raise TypeError(f"Action handlers must be async functions. Got sync function for '{name}'.")

        input_spec = inspect.getfullargspec(metadata_fn if metadata_fn else fn)
        try:
            resolved_annotations = get_type_hints(metadata_fn if metadata_fn else fn)
        except (NameError, TypeError, AttributeError):
            resolved_annotations = input_spec.annotations
        action_args, arg_types = extract_action_args_and_types(input_spec, resolved_annotations)
        # Raw user fn; tracing/dispatch handled by _run_with_telemetry / _invoke.
        self._fn: Callable[..., Awaitable[OutputT]] = fn
        self._n_action_args: int = len(action_args)
        self._initialize_io_schemas(action_args, arg_types, resolved_annotations, input_spec)

    @property
    def kind(self) -> ActionKind:
        return self._kind

    @property
    def name(self) -> str:
        return self._name

    @property
    def description(self) -> str | None:
        return self._description

    @property
    def metadata(self) -> dict[str, object]:
        return self._metadata

    @property
    def input_type(self) -> TypeAdapter[InputT] | None:
        return self._input_type

    @property
    def input_schema(self) -> dict[str, object]:
        return self._input_schema

    @input_schema.setter
    def input_schema(self, value: dict[str, object]) -> None:
        self._input_schema = value
        self._metadata[ActionMetadataKey.INPUT_KEY] = value

    @property
    def output_schema(self) -> dict[str, object]:
        return self._output_schema

    @output_schema.setter
    def output_schema(self, value: dict[str, object]) -> None:
        self._output_schema = value
        self._metadata[ActionMetadataKey.OUTPUT_KEY] = value

    def _override_input_schema(
        self,
        input_schema: type[BaseModel] | dict[str, object],
    ) -> None:
        """Replace inferred input JSON Schema and validation type (e.g. tool schema overrides)."""
        in_js = to_json_schema(input_schema)
        self.input_schema = in_js
        if isinstance(input_schema, dict):
            self._input_type = None
        else:
            self._input_type = cast(TypeAdapter[InputT], TypeAdapter(input_schema))

    async def __call__(self, input: InputT | None = None) -> OutputT:
        """Call the action directly, returning just the response value."""
        return (await self.run(input)).response

    async def run(
        self,
        input: InputT | None = None,
        on_chunk: Callable[[ChunkT], None] | None = None,
        context: dict[str, object] | None = None,
        on_trace_start: Callable[[str, str], Awaitable[None]] | None = None,
        telemetry_labels: dict[str, object] | None = None,
    ) -> ActionResponse[OutputT]:
        """Execute the action with optional input validation.

        Args:
            input: The input to the action. Will be validated against the input schema.
            on_chunk: Optional streaming callback for chunked responses.
            context: Optional context dict for the action.
            on_trace_start: Optional callback invoked when trace starts.
            telemetry_labels: Custom labels to set as direct span attributes.

        Returns:
            ActionResponse containing the result and trace metadata.

        Raises:
            GenkitError: If input validation fails (INVALID_ARGUMENT status).
        """
        # Validate input if we have a schema
        if self._input_type is not None:
            try:
                input = self._input_type.validate_python(input)
            except ValidationError as e:
                if input is None:
                    raise GenkitError(
                        message=(
                            f"Action '{self.name}' requires input but none was provided. "
                            'Please supply a valid input payload.'
                        ),
                        status='INVALID_ARGUMENT',
                    ) from e
                raise GenkitError(
                    message=f"Invalid input for action '{self.name}': {e}",
                    status='INVALID_ARGUMENT',
                    cause=e,
                ) from e

        if context:
            _ = _action_context.set(context)

        streaming_cb = cast(StreamingCallback, on_chunk) if on_chunk else None

        return await self._run_with_telemetry(
            input,
            ActionRunContext(
                context=_action_context.get(None),
                streaming_callback=streaming_cb,
            ),
            on_trace_start,
            telemetry_labels,
        )

    def stream(
        self,
        input: InputT | None = None,
        context: dict[str, object] | None = None,
        telemetry_labels: dict[str, object] | None = None,
        timeout: float | None = None,
    ) -> StreamResponse[ChunkT, OutputT]:
        """Execute and return a StreamResponse with .stream and .response properties."""
        channel: Channel[ChunkT, ActionResponse[OutputT]] = Channel(timeout=timeout)

        def send_chunk(c: ChunkT) -> None:
            channel.send(c)

        resp = self.run(
            input=input,
            context=context,
            telemetry_labels=telemetry_labels,
            on_chunk=send_chunk,
        )
        channel.set_close_future(asyncio.create_task(resp))

        result_future: asyncio.Future[OutputT] = asyncio.Future()
        channel.closed.add_done_callback(lambda _: result_future.set_result(channel.closed.result().response))

        return StreamResponse(stream=channel, response=result_future)

    def _initialize_io_schemas(
        self,
        action_args: list[str],
        arg_types: list[type],
        annotations: dict[str, Any],
        _input_spec: inspect.FullArgSpec,
    ) -> None:
        # Allow up to 2 args: (input, ctx) - use ctx.send_chunk() for streaming
        if len(action_args) > 2:
            raise TypeError(f'can only have up to 2 args: {action_args}')

        if len(action_args) > 0:
            type_adapter = TypeAdapter(arg_types[0])
            self._input_schema: dict[str, object] = type_adapter.json_schema()
            self._input_type: TypeAdapter[InputT] | None = cast(TypeAdapter[InputT], type_adapter)
            self._metadata[ActionMetadataKey.INPUT_KEY] = self._input_schema
        else:
            self._input_schema = TypeAdapter(object).json_schema()
            self._input_type = None
            self._metadata[ActionMetadataKey.INPUT_KEY] = self._input_schema

        if ActionMetadataKey.RETURN in annotations:
            type_adapter = TypeAdapter(annotations[ActionMetadataKey.RETURN])
            self._output_schema: dict[str, object] = type_adapter.json_schema()
            self._metadata[ActionMetadataKey.OUTPUT_KEY] = self._output_schema
        else:
            self._output_schema = TypeAdapter(object).json_schema()
            self._metadata[ActionMetadataKey.OUTPUT_KEY] = self._output_schema

    async def _run_with_telemetry(
        self,
        input: object | None,
        ctx: ActionRunContext,
        on_trace_start: Callable[[str, str], Awaitable[None]] | None,
        telemetry_labels: dict[str, object] | None,
    ) -> ActionResponse[OutputT]:
        """Open the action span via ``run_in_new_span``, dispatch ``self._fn``, wrap errors in ``GenkitError``."""
        start_time = time.perf_counter()
        suppress = str((telemetry_labels or {}).get('genkitx:ignore-trace', '')).lower() == 'true'
        suppress_token = suppress_telemetry.set(True) if suppress else None

        # ``type``/``subtype`` set canonical genkit:type / genkit:metadata:subtype attrs.
        # ``self._span_metadata`` uses short keys; run_in_new_span auto-prefixes them with
        # ``genkit:metadata:``. ``telemetry_labels`` are caller-controlled passthrough attrs.
        extra_metadata: dict[str, str] = {k: str(v) for k, v in self._span_metadata.items()}
        # Surface action context (auth, headers, etc.) on the span so the Dev UI
        # trace inspector can render the "Context" panel for a flow run.
        if ctx.context:
            try:
                extra_metadata['context'] = json.dumps(ctx.context)
            except Exception:
                try:
                    cleaned_context = _sanitize_value(ctx.context)
                    extra_metadata['context'] = json.dumps(cleaned_context)
                except Exception:
                    extra_metadata['context'] = str(ctx.context)
        span_meta = SpanMetadata(
            name=self._name,
            type='action',
            subtype=str(self._kind),
            input=input,
            metadata=extra_metadata or None,
            telemetry_labels={k: str(v) for k, v in (telemetry_labels or {}).items()} or None,
        )

        trace_id = ''
        try:
            with run_in_new_span(span_meta) as span:
                # OpenTelemetry standard hex format.
                trace_id = format(span.get_span_context().trace_id, '032x')
                span_id = format(span.get_span_context().span_id, '016x')
                if on_trace_start:
                    await on_trace_start(trace_id, span_id)

                output = await self._invoke(input, ctx)
                output = cast(OutputT, _record_latency(output, start_time))
                # Picked up by run_in_new_span's success branch and written as ``genkit:output``.
                span_meta.output = output
                return ActionResponse(response=output, trace_id=trace_id, span_id=span_id)
        except GenkitError:
            raise
        except Exception as e:
            # Wrap outside the with-block so we don't clobber ``genkit:error`` (which
            # ``run_in_new_span`` already set to ``str(original_e)``).
            raise GenkitError(
                cause=e,
                message=f'Error while running action {self._name}',
                trace_id=trace_id,
            ) from e
        finally:
            if suppress_token is not None:
                suppress_telemetry.reset(suppress_token)

    async def _invoke(self, input: object | None, ctx: ActionRunContext) -> OutputT:
        """Dispatch ``self._fn`` based on its declared arity (0/1/2 args)."""
        match self._n_action_args:
            case 0:
                return await self._fn()
            case 1:
                return await self._fn(input)
            case 2:
                return await self._fn(input, ctx)
            case _:
                raise ValueError('action fn must have 0-2 args')

__call__(input=None) async

Call the action directly, returning just the response value.

Source code in packages/genkit/src/genkit/_core/_action.py
439
440
441
async def __call__(self, input: InputT | None = None) -> OutputT:
    """Call the action directly, returning just the response value."""
    return (await self.run(input)).response

run(input=None, on_chunk=None, context=None, on_trace_start=None, telemetry_labels=None) async

Execute the action with optional input validation.

Parameters:

Name Type Description Default
input InputT | None

The input to the action. Will be validated against the input schema.

None
on_chunk Callable[[ChunkT], None] | None

Optional streaming callback for chunked responses.

None
context dict[str, object] | None

Optional context dict for the action.

None
on_trace_start Callable[[str, str], Awaitable[None]] | None

Optional callback invoked when trace starts.

None
telemetry_labels dict[str, object] | None

Custom labels to set as direct span attributes.

None

Returns:

Type Description
ActionResponse[OutputT]

ActionResponse containing the result and trace metadata.

Raises:

Type Description
GenkitError

If input validation fails (INVALID_ARGUMENT status).

Source code in packages/genkit/src/genkit/_core/_action.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
async def run(
    self,
    input: InputT | None = None,
    on_chunk: Callable[[ChunkT], None] | None = None,
    context: dict[str, object] | None = None,
    on_trace_start: Callable[[str, str], Awaitable[None]] | None = None,
    telemetry_labels: dict[str, object] | None = None,
) -> ActionResponse[OutputT]:
    """Execute the action with optional input validation.

    Args:
        input: The input to the action. Will be validated against the input schema.
        on_chunk: Optional streaming callback for chunked responses.
        context: Optional context dict for the action.
        on_trace_start: Optional callback invoked when trace starts.
        telemetry_labels: Custom labels to set as direct span attributes.

    Returns:
        ActionResponse containing the result and trace metadata.

    Raises:
        GenkitError: If input validation fails (INVALID_ARGUMENT status).
    """
    # Validate input if we have a schema
    if self._input_type is not None:
        try:
            input = self._input_type.validate_python(input)
        except ValidationError as e:
            if input is None:
                raise GenkitError(
                    message=(
                        f"Action '{self.name}' requires input but none was provided. "
                        'Please supply a valid input payload.'
                    ),
                    status='INVALID_ARGUMENT',
                ) from e
            raise GenkitError(
                message=f"Invalid input for action '{self.name}': {e}",
                status='INVALID_ARGUMENT',
                cause=e,
            ) from e

    if context:
        _ = _action_context.set(context)

    streaming_cb = cast(StreamingCallback, on_chunk) if on_chunk else None

    return await self._run_with_telemetry(
        input,
        ActionRunContext(
            context=_action_context.get(None),
            streaming_callback=streaming_cb,
        ),
        on_trace_start,
        telemetry_labels,
    )

stream(input=None, context=None, telemetry_labels=None, timeout=None)

Execute and return a StreamResponse with .stream and .response properties.

Source code in packages/genkit/src/genkit/_core/_action.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
def stream(
    self,
    input: InputT | None = None,
    context: dict[str, object] | None = None,
    telemetry_labels: dict[str, object] | None = None,
    timeout: float | None = None,
) -> StreamResponse[ChunkT, OutputT]:
    """Execute and return a StreamResponse with .stream and .response properties."""
    channel: Channel[ChunkT, ActionResponse[OutputT]] = Channel(timeout=timeout)

    def send_chunk(c: ChunkT) -> None:
        channel.send(c)

    resp = self.run(
        input=input,
        context=context,
        telemetry_labels=telemetry_labels,
        on_chunk=send_chunk,
    )
    channel.set_close_future(asyncio.create_task(resp))

    result_future: asyncio.Future[OutputT] = asyncio.Future()
    channel.closed.add_done_callback(lambda _: result_future.set_result(channel.closed.result().response))

    return StreamResponse(stream=channel, response=result_future)

genkit.Flow = Action module-attribute

genkit.ActionKind

Bases: StrEnum

Types of actions that can be registered.

Source code in packages/genkit/src/genkit/_core/_action.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class ActionKind(StrEnum):
    """Types of actions that can be registered."""

    BACKGROUND_MODEL = 'background-model'
    CANCEL_OPERATION = 'cancel-operation'
    CHECK_OPERATION = 'check-operation'
    CUSTOM = 'custom'
    DYNAMIC_ACTION_PROVIDER = 'dynamic-action-provider'
    EMBEDDER = 'embedder'
    EVALUATOR = 'evaluator'
    EXECUTABLE_PROMPT = 'executable-prompt'
    FLOW = 'flow'
    INDEXER = 'indexer'
    MODEL = 'model'
    PROMPT = 'prompt'
    RERANKER = 'reranker'
    RESOURCE = 'resource'
    RETRIEVER = 'retriever'
    TOOL = 'tool'
    UTIL = 'util'

genkit.ActionRunContext

Execution context for an action.

Provides read-only access to action context (auth, metadata) and streaming support.

Source code in packages/genkit/src/genkit/_core/_action.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
class ActionRunContext:
    """Execution context for an action.

    Provides read-only access to action context (auth, metadata) and streaming support.
    """

    def __init__(
        self,
        context: dict[str, object] | None = None,
        streaming_callback: StreamingCallback | None = None,
    ) -> None:
        self._context: dict[str, object] = context if context is not None else {}
        self._streaming_callback = streaming_callback

    @property
    def context(self) -> dict[str, object]:
        return self._context

    @property
    def is_streaming(self) -> bool:
        """Returns True if a streaming callback is registered."""
        return self._streaming_callback is not None

    @property
    def streaming_callback(self) -> StreamingCallback | None:
        """Returns the streaming callback, if any.

        Use this when you need to pass the callback to another action.
        For sending chunks directly, use send_chunk() instead.
        """
        return self._streaming_callback

    def send_chunk(self, chunk: object) -> None:
        """Send a streaming chunk to the client.

        Args:
            chunk: The chunk data to stream.
        """
        if self._streaming_callback is not None:
            self._streaming_callback(chunk)

    @staticmethod
    def _current_context() -> dict[str, object] | None:
        return _action_context.get(None)

is_streaming property

Returns True if a streaming callback is registered.

streaming_callback property

Returns the streaming callback, if any.

Use this when you need to pass the callback to another action. For sending chunks directly, use send_chunk() instead.

send_chunk(chunk)

Send a streaming chunk to the client.

Parameters:

Name Type Description Default
chunk object

The chunk data to stream.

required
Source code in packages/genkit/src/genkit/_core/_action.py
339
340
341
342
343
344
345
346
def send_chunk(self, chunk: object) -> None:
    """Send a streaming chunk to the client.

    Args:
        chunk: The chunk data to stream.
    """
    if self._streaming_callback is not None:
        self._streaming_callback(chunk)

genkit.ExecutablePrompt

Bases: Generic[InputT, OutputT]

A callable prompt with typed input/output that generates AI responses.

Source code in packages/genkit/src/genkit/_ai/_prompt.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
class ExecutablePrompt(Generic[InputT, OutputT]):
    """A callable prompt with typed input/output that generates AI responses."""

    def __init__(
        self,
        registry: Registry,
        variant: str | None = None,
        model: str | None = None,
        config: dict[str, Any] | ModelConfig | None = None,
        description: str | None = None,
        input_schema: type | dict[str, Any] | str | None = None,
        system: str | list[Part] | None = None,
        prompt: str | list[Part] | None = None,
        messages: str | list[Message] | None = None,
        output_format: str | None = None,
        output_content_type: str | None = None,
        output_instructions: str | None = None,
        output_schema: type | dict[str, Any] | str | None = None,
        output_constrained: bool | None = None,
        max_turns: int | None = None,
        return_tool_requests: bool | None = None,
        metadata: dict[str, Any] | None = None,
        tools: Sequence[str | Tool] | None = None,
        tool_choice: ToolChoice | None = None,
        use: list[ModelMiddleware] | None = None,
        docs: list[Document] | None = None,
        resources: list[str] | None = None,
        name: str | None = None,
        ns: str | None = None,
    ) -> None:
        """Initialize prompt with configuration, templates, and schema options."""
        self._registry = registry
        self._variant = variant
        self._model = model
        self._config = config
        self._description = description
        self._input_schema = input_schema
        self._system = system
        self._prompt = prompt
        self._messages = messages
        self._output_format = output_format
        self._output_content_type = output_content_type
        self._output_instructions = output_instructions
        self._output_schema = output_schema
        self._output_constrained = output_constrained
        self._max_turns = max_turns
        self._return_tool_requests = return_tool_requests
        self._metadata = metadata
        self._tools = tools
        self._tool_choice = tool_choice
        self._use = use
        self._docs = docs
        self._resources = resources
        self._cache_prompt: PromptCache = PromptCache()
        self._name = name
        self._ns = ns
        self._prompt_action: Action | None = None

    @property
    def ref(self) -> dict[str, Any]:
        """Reference object with prompt name and metadata."""
        return {
            'name': registry_definition_key(self._name, self._variant, self._ns) if self._name else None,
            'metadata': self._metadata,
        }

    async def _ensure_resolved(self) -> None:
        if self._prompt_action or not self._name:
            return

        # Preserve Pydantic schema type if it was explicitly provided via ai.prompt(..., output=Output(schema=T))
        # The resolved prompt from .prompt file will have a dict schema, but we want to keep the Pydantic type
        # for runtime validation to get proper typed output.
        original_output_schema = self._output_schema

        resolved = await lookup_prompt(self._registry, self._name, self._variant)
        self._model = resolved._model
        self._config = resolved._config
        self._description = resolved._description
        self._input_schema = resolved._input_schema
        self._system = resolved._system
        self._prompt = resolved._prompt
        self._messages = resolved._messages
        self._output_format = resolved._output_format
        self._output_content_type = resolved._output_content_type
        self._output_instructions = resolved._output_instructions
        # Keep original Pydantic type if provided, otherwise use resolved (dict) schema
        if isinstance(original_output_schema, type) and issubclass(original_output_schema, BaseModel):
            self._output_schema = original_output_schema
        else:
            self._output_schema = resolved._output_schema
        self._output_constrained = resolved._output_constrained
        self._max_turns = resolved._max_turns
        self._return_tool_requests = resolved._return_tool_requests
        self._metadata = resolved._metadata
        self._tools = resolved._tools
        self._tool_choice = resolved._tool_choice
        self._use = resolved._use
        self._docs = resolved._docs
        self._resources = resolved._resources
        self._prompt_action = resolved._prompt_action

    async def __call__(
        self,
        input: InputT | dict[str, Any] | None = None,
        **opts: Unpack[PromptGenerateOptions],
    ) -> ModelResponse[OutputT]:
        """Execute the prompt and return the response.

        Args:
            input: Template variables for rendering.
        """
        return await self._call_impl(input, opts)  # ty: ignore[invalid-argument-type]  # ty doesn't infer Unpack[TD] as TD in function body (PEP 692 gap)

    async def _call_impl(
        self,
        input: InputT | dict[str, Any] | None,
        opts: PromptGenerateOptions,
    ) -> ModelResponse[OutputT]:
        """Execute the prompt with resolved opts. Used by __call__ and stream."""
        await self._ensure_resolved()
        on_chunk = opts.get('on_chunk')
        middleware = opts.get('use') or self._use
        context = opts.get('context')
        prompt_config = self._prompt_config_for_call(opts)
        registry = await registry_with_inline_tools(self._registry, prompt_config.tools)
        gen_options = await executable_prompt_call_to_generate_options(self, registry, prompt_config, input, opts)
        result = await generate_action(
            registry,
            gen_options,
            on_chunk=on_chunk,
            middleware=middleware,
            context=context if context else ActionRunContext._current_context(),  # pyright: ignore[reportPrivateUsage]
        )
        return cast(ModelResponse[OutputT], result)

    def _prompt_config_for_call(self, opts: PromptGenerateOptions) -> PromptConfig:
        """Merge this prompt's definition with per-call ``opts`` into a :class:`PromptConfig`."""
        output_opts = opts.get('output') or {}
        merged_config: dict[str, Any] | ModelConfig | None
        if opts.get('config') is not None:
            base = (
                self._config.model_dump(exclude_none=True)
                if isinstance(self._config, BaseModel)
                else (self._config or {})
            )
            opt_config = opts.get('config')
            override = (
                opt_config.model_dump(exclude_none=True) if isinstance(opt_config, BaseModel) else (opt_config or {})
            )
            merged_config = {**base, **override} if base or override else None
        else:
            merged_config = self._config

        merged_metadata = (
            {**(self._metadata or {}), **(opts.get('metadata') or {})} if opts.get('metadata') else self._metadata
        )

        def _or(opt_val: Any, default: Any) -> Any:  # noqa: ANN401
            return opt_val if opt_val is not None else default

        return PromptConfig(
            model=opts.get('model') or self._model,
            prompt=self._prompt,
            system=self._system,
            messages=self._messages,
            tools=opts.get('tools') or self._tools,
            return_tool_requests=_or(opts.get('return_tool_requests'), self._return_tool_requests),
            tool_choice=opts.get('tool_choice') or self._tool_choice,
            config=merged_config,
            max_turns=_or(opts.get('max_turns'), self._max_turns),
            output_format=output_opts.get('format') or self._output_format,
            output_content_type=output_opts.get('content_type') or self._output_content_type,
            output_instructions=_or(output_opts.get('instructions'), self._output_instructions),
            output_schema=output_opts.get('schema') or output_opts.get('json_schema') or self._output_schema,
            output_constrained=_or(output_opts.get('constrained'), self._output_constrained),
            input_schema=self._input_schema,
            metadata=merged_metadata,
            docs=self._docs,
            resources=opts.get('resources') or self._resources,
            resume_respond=opts.get('resume_respond'),
            resume_restart=opts.get('resume_restart'),
            resume_metadata=opts.get('resume_metadata'),
        )

    def stream(
        self,
        input: InputT | dict[str, Any] | None = None,
        *,
        timeout: float | None = None,
        **opts: Unpack[PromptGenerateOptions],
    ) -> ModelStreamResponse[OutputT]:
        """Stream the prompt execution, returning (stream, response_future)."""
        channel: Channel[ModelResponseChunk, ModelResponse[OutputT]] = Channel(timeout=timeout)
        stream_opts: PromptGenerateOptions = {
            **opts,  # ty doesn't infer Unpack[TD] as TD in function body (PEP 692 gap)
            'on_chunk': lambda c: channel.send(cast(ModelResponseChunk, c)),
        }
        resp = self._call_impl(input, stream_opts)
        response_future: asyncio.Future[ModelResponse[OutputT]] = asyncio.create_task(resp)
        channel.set_close_future(response_future)

        return ModelStreamResponse[OutputT](channel=channel, response_future=response_future)

    async def render(
        self,
        input: InputT | dict[str, Any] | None = None,
        **opts: Unpack[PromptGenerateOptions],
    ) -> GenerateActionOptions:
        """Render the prompt template without executing, returning GenerateActionOptions.

        Same keyword options as ``__call__`` (see PromptGenerateOptions).
        """
        call_opts: PromptGenerateOptions = opts  # ty: ignore[invalid-assignment]  # ty treats **opts as a plain dict here; callers are still validated against PromptGenerateOptions.
        await self._ensure_resolved()
        prompt_config = self._prompt_config_for_call(call_opts)
        registry = await registry_with_inline_tools(self._registry, prompt_config.tools)
        return await executable_prompt_call_to_generate_options(self, registry, prompt_config, input, call_opts)

ref property

Reference object with prompt name and metadata.

__call__(input=None, **opts) async

Execute the prompt and return the response.

Parameters:

Name Type Description Default
input InputT | dict[str, Any] | None

Template variables for rendering.

None
Source code in packages/genkit/src/genkit/_ai/_prompt.py
326
327
328
329
330
331
332
333
334
335
336
async def __call__(
    self,
    input: InputT | dict[str, Any] | None = None,
    **opts: Unpack[PromptGenerateOptions],
) -> ModelResponse[OutputT]:
    """Execute the prompt and return the response.

    Args:
        input: Template variables for rendering.
    """
    return await self._call_impl(input, opts)  # ty: ignore[invalid-argument-type]  # ty doesn't infer Unpack[TD] as TD in function body (PEP 692 gap)

__init__(registry, variant=None, model=None, config=None, description=None, input_schema=None, system=None, prompt=None, messages=None, output_format=None, output_content_type=None, output_instructions=None, output_schema=None, output_constrained=None, max_turns=None, return_tool_requests=None, metadata=None, tools=None, tool_choice=None, use=None, docs=None, resources=None, name=None, ns=None)

Initialize prompt with configuration, templates, and schema options.

Source code in packages/genkit/src/genkit/_ai/_prompt.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def __init__(
    self,
    registry: Registry,
    variant: str | None = None,
    model: str | None = None,
    config: dict[str, Any] | ModelConfig | None = None,
    description: str | None = None,
    input_schema: type | dict[str, Any] | str | None = None,
    system: str | list[Part] | None = None,
    prompt: str | list[Part] | None = None,
    messages: str | list[Message] | None = None,
    output_format: str | None = None,
    output_content_type: str | None = None,
    output_instructions: str | None = None,
    output_schema: type | dict[str, Any] | str | None = None,
    output_constrained: bool | None = None,
    max_turns: int | None = None,
    return_tool_requests: bool | None = None,
    metadata: dict[str, Any] | None = None,
    tools: Sequence[str | Tool] | None = None,
    tool_choice: ToolChoice | None = None,
    use: list[ModelMiddleware] | None = None,
    docs: list[Document] | None = None,
    resources: list[str] | None = None,
    name: str | None = None,
    ns: str | None = None,
) -> None:
    """Initialize prompt with configuration, templates, and schema options."""
    self._registry = registry
    self._variant = variant
    self._model = model
    self._config = config
    self._description = description
    self._input_schema = input_schema
    self._system = system
    self._prompt = prompt
    self._messages = messages
    self._output_format = output_format
    self._output_content_type = output_content_type
    self._output_instructions = output_instructions
    self._output_schema = output_schema
    self._output_constrained = output_constrained
    self._max_turns = max_turns
    self._return_tool_requests = return_tool_requests
    self._metadata = metadata
    self._tools = tools
    self._tool_choice = tool_choice
    self._use = use
    self._docs = docs
    self._resources = resources
    self._cache_prompt: PromptCache = PromptCache()
    self._name = name
    self._ns = ns
    self._prompt_action: Action | None = None

render(input=None, **opts) async

Render the prompt template without executing, returning GenerateActionOptions.

Same keyword options as __call__ (see PromptGenerateOptions).

Source code in packages/genkit/src/genkit/_ai/_prompt.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
async def render(
    self,
    input: InputT | dict[str, Any] | None = None,
    **opts: Unpack[PromptGenerateOptions],
) -> GenerateActionOptions:
    """Render the prompt template without executing, returning GenerateActionOptions.

    Same keyword options as ``__call__`` (see PromptGenerateOptions).
    """
    call_opts: PromptGenerateOptions = opts  # ty: ignore[invalid-assignment]  # ty treats **opts as a plain dict here; callers are still validated against PromptGenerateOptions.
    await self._ensure_resolved()
    prompt_config = self._prompt_config_for_call(call_opts)
    registry = await registry_with_inline_tools(self._registry, prompt_config.tools)
    return await executable_prompt_call_to_generate_options(self, registry, prompt_config, input, call_opts)

stream(input=None, *, timeout=None, **opts)

Stream the prompt execution, returning (stream, response_future).

Source code in packages/genkit/src/genkit/_ai/_prompt.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def stream(
    self,
    input: InputT | dict[str, Any] | None = None,
    *,
    timeout: float | None = None,
    **opts: Unpack[PromptGenerateOptions],
) -> ModelStreamResponse[OutputT]:
    """Stream the prompt execution, returning (stream, response_future)."""
    channel: Channel[ModelResponseChunk, ModelResponse[OutputT]] = Channel(timeout=timeout)
    stream_opts: PromptGenerateOptions = {
        **opts,  # ty doesn't infer Unpack[TD] as TD in function body (PEP 692 gap)
        'on_chunk': lambda c: channel.send(cast(ModelResponseChunk, c)),
    }
    resp = self._call_impl(input, stream_opts)
    response_future: asyncio.Future[ModelResponse[OutputT]] = asyncio.create_task(resp)
    channel.set_close_future(response_future)

    return ModelStreamResponse[OutputT](channel=channel, response_future=response_future)

genkit.PromptGenerateOptions

Bases: TypedDict

Runtime options for prompt execution (config, tools, messages, etc.).

Source code in packages/genkit/src/genkit/_ai/_prompt.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class PromptGenerateOptions(TypedDict, total=False):
    """Runtime options for prompt execution (config, tools, messages, etc.)."""

    model: str | None
    config: dict[str, Any] | ModelConfig | None
    messages: list[Message] | None
    docs: list[Document] | None
    tools: Sequence[str | Tool] | None
    resources: list[str] | None
    tool_choice: ToolChoice | None
    output: OutputOptions | None
    resume_respond: ToolResponsePart | list[ToolResponsePart] | None
    resume_restart: ToolRequestPart | list[ToolRequestPart] | None
    resume_metadata: dict[str, Any] | None
    return_tool_requests: bool | None
    max_turns: int | None
    on_chunk: ModelStreamingCallback | None
    use: list[ModelMiddleware] | None
    context: dict[str, Any] | None
    step_name: str | None
    metadata: dict[str, Any] | None

genkit.Tool

A registered tool: a callable handle backed by an :class:~genkit._core._action.Action.

Obtain instances via :func:define_tool, :func:define_interrupt, :func:tool, or the @ai.tool decorator rather than constructing directly.

Source code in packages/genkit/src/genkit/_ai/_tools.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class Tool:
    """A registered tool: a callable handle backed by an :class:`~genkit._core._action.Action`.

    Obtain instances via :func:`define_tool`, :func:`define_interrupt`, :func:`tool`, or the
    ``@ai.tool`` decorator rather than constructing directly.
    """

    def __init__(self, action: Action) -> None:
        self._action = action

    @property
    def name(self) -> str:
        """Tool name (registry key)."""
        return self._action.name

    @property
    def description(self) -> str:
        """Human-readable description sent to the model."""
        return self._action.description or ''

    @property
    def input_schema(self) -> dict[str, object] | None:
        """JSON Schema for the tool's input, as sent on the wire."""
        return self._action.input_schema

    @property
    def output_schema(self) -> dict[str, object] | None:
        """JSON Schema for the tool's output."""
        return self._action.output_schema

    def definition(self) -> ToolDefinition:
        """Return the wire-format ToolDefinition for this tool."""
        return ToolDefinition(
            name=self.name,
            description=self.description,
            input_schema=self.input_schema,
            output_schema=self.output_schema,
        )

    def action(self) -> Action[Any, Any, Any]:
        """Return the underlying :class:`~genkit._core._action.Action` registered for this tool."""
        return self._action

    async def __call__(self, *args: Any, **kwargs: Any) -> Any:  # noqa: ANN401
        """Run the tool and return the unwrapped response value."""
        return (await self._action.run(*args, **kwargs)).response

description property

Human-readable description sent to the model.

input_schema property

JSON Schema for the tool's input, as sent on the wire.

name property

Tool name (registry key).

output_schema property

JSON Schema for the tool's output.

__call__(*args, **kwargs) async

Run the tool and return the unwrapped response value.

Source code in packages/genkit/src/genkit/_ai/_tools.py
77
78
79
async def __call__(self, *args: Any, **kwargs: Any) -> Any:  # noqa: ANN401
    """Run the tool and return the unwrapped response value."""
    return (await self._action.run(*args, **kwargs)).response

action()

Return the underlying :class:~genkit._core._action.Action registered for this tool.

Source code in packages/genkit/src/genkit/_ai/_tools.py
73
74
75
def action(self) -> Action[Any, Any, Any]:
    """Return the underlying :class:`~genkit._core._action.Action` registered for this tool."""
    return self._action

definition()

Return the wire-format ToolDefinition for this tool.

Source code in packages/genkit/src/genkit/_ai/_tools.py
64
65
66
67
68
69
70
71
def definition(self) -> ToolDefinition:
    """Return the wire-format ToolDefinition for this tool."""
    return ToolDefinition(
        name=self.name,
        description=self.description,
        input_schema=self.input_schema,
        output_schema=self.output_schema,
    )

genkit.tool(func, *, name=None, description=None, input_schema=None)

Dynamically define a tool that can passed into a generate call.

Compared to define_tool, the tool constructor doesn't register the tool. The Tool instance cannot be referenced by name later.

Use when there are dynamic or ephemeral tools that need to be available for a particular generate call.

Parameters:

Name Type Description Default
func Callable[..., Any]

Async tool implementation (same 0–2 argument rules as :func:define_tool).

required
name str | None

Tool name for the model. Defaults to func.__name__.

None
description str | None

Sent to the model. Defaults to the function docstring.

None
input_schema type[BaseModel] | dict[str, object] | None

Optional input schema override (Pydantic model or JSON-schema dict).

None

Raises:

Type Description
TypeError

If func is not a coroutine function.

ValueError

If no name is given and func has no __name__.

Source code in packages/genkit/src/genkit/_ai/_tools.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def tool(
    func: Callable[..., Any],
    *,
    name: str | None = None,
    description: str | None = None,
    input_schema: type[BaseModel] | dict[str, object] | None = None,
) -> Tool:
    """Dynamically define a tool that can passed into a `generate` call.

    Compared to `define_tool`, the `tool` constructor doesn't register the tool.
    The Tool instance cannot be referenced by name later.

    Use when there are dynamic or ephemeral tools that need to be available
    for a particular `generate` call.

    Args:
        func: Async tool implementation (same 0–2 argument rules as :func:`define_tool`).
        name: Tool name for the model. Defaults to ``func.__name__``.
        description: Sent to the model. Defaults to the function docstring.
        input_schema: Optional input schema override (Pydantic model or JSON-schema dict).

    Raises:
        TypeError: If ``func`` is not a coroutine function.
        ValueError: If no ``name`` is given and ``func`` has no ``__name__``.
    """
    return _define_tool(Registry(), func, name, description, input_schema=input_schema)

genkit.respond_to_interrupt(response, *, interrupt, metadata=None)

Build a ToolResponsePart for a pending tool interrupt.

Pass the return value to generate(..., resume_respond=interrupt_response).

Parameters:

Name Type Description Default
response Any

Tool output / user reply for this interrupt.

required
interrupt ToolRequestPart

The interrupted ToolRequestPart (e.g. from response.interrupts).

required
metadata dict[str, Any] | None

Optional metadata for the interrupt response channel.

None
Source code in packages/genkit/src/genkit/_ai/_tools.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def respond_to_interrupt(
    response: Any,  # noqa: ANN401 - user reply or tool output for resume_respond
    *,
    interrupt: ToolRequestPart,
    metadata: dict[str, Any] | None = None,
) -> ToolResponsePart:
    """Build a ``ToolResponsePart`` for a pending tool interrupt.

    Pass the return value to ``generate(..., resume_respond=interrupt_response)``.

    Args:
        response: Tool output / user reply for this interrupt.
        interrupt: The interrupted ``ToolRequestPart`` (e.g. from ``response.interrupts``).
        metadata: Optional metadata for the interrupt response channel.
    """
    return _tool_response_part(interrupt, response, metadata)

genkit.restart_tool(interrupt, *, resumed_metadata=None, replace_input=None)

Build a restart ToolRequestPart for a pending tool interrupt.

Pass the return value to generate(..., resume_restart=...).

Parameters:

Name Type Description Default
interrupt ToolRequestPart

The interrupted ToolRequestPart (e.g. from response.interrupts).

required
resumed_metadata dict[str, Any] | None

Passed to the tool as ToolRunContext.resumed_metadata. The common case is a small dict the tool / middleware checks (e.g. {'toolApproved': True} for ToolApproval).

None
replace_input Any | None

Optional new tool_request.input for this run; the previous input is stashed in metadata.replacedInput so the tool can see what changed.

None

Returns:

Type Description
ToolRequestPart

A ToolRequestPart for resume_restart / message history.

Example

restart_tool(trp, resumed_metadata={'toolApproved': True})

Source code in packages/genkit/src/genkit/_ai/_tools.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def restart_tool(
    interrupt: ToolRequestPart,
    *,
    resumed_metadata: dict[str, Any] | None = None,
    replace_input: Any | None = None,  # noqa: ANN401 - new tool input; shape is per tool
) -> ToolRequestPart:
    """Build a restart ``ToolRequestPart`` for a pending tool interrupt.

    Pass the return value to ``generate(..., resume_restart=...)``.

    Args:
        interrupt: The interrupted ``ToolRequestPart`` (e.g. from ``response.interrupts``).
        resumed_metadata: Passed to the tool as ``ToolRunContext.resumed_metadata``. The
            common case is a small dict the tool / middleware checks
            (e.g. ``{'toolApproved': True}`` for ``ToolApproval``).
        replace_input: Optional new ``tool_request.input`` for this run; the previous input
            is stashed in ``metadata.replacedInput`` so the tool can see what changed.

    Returns:
        A ``ToolRequestPart`` for ``resume_restart`` / message history.

    Example:
        ``restart_tool(trp, resumed_metadata={'toolApproved': True})``
    """
    tool_req = interrupt.tool_request

    new_meta: dict[str, Any] = dict(interrupt.metadata or {})

    new_meta['resumed'] = resumed_metadata if resumed_metadata is not None else True

    new_input = tool_req.input
    if replace_input is not None:
        new_meta['replacedInput'] = tool_req.input
        new_input = replace_input

    return ToolRequestPart(
        tool_request=ToolRequest(
            name=tool_req.name,
            ref=tool_req.ref,
            input=new_input,
        ),
        metadata=new_meta,
    )

genkit.ToolRunContext

Bases: ActionRunContext

Tool execution context with interrupt support.

Source code in packages/genkit/src/genkit/_ai/_tools.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class ToolRunContext(ActionRunContext):
    """Tool execution context with interrupt support."""

    def __init__(
        self,
        ctx: ActionRunContext,
        resumed_metadata: dict[str, Any] | None = None,
        original_input: Any = None,  # noqa: ANN401 - prior tool_request.input when replacing on restart
    ) -> None:
        """Initialize from parent ActionRunContext.

        Args:
            ctx: Parent action context
            resumed_metadata: Metadata from previous interrupt (if resumed)
            original_input: Original tool input before replacement (if resumed)
        """
        super().__init__(context=ctx.context)
        self.resumed_metadata = resumed_metadata
        self.original_input = original_input

    def is_resumed(self) -> bool:
        """Return True if this execution is resuming after an interrupt."""
        return self.resumed_metadata is not None

__init__(ctx, resumed_metadata=None, original_input=None)

Initialize from parent ActionRunContext.

Parameters:

Name Type Description Default
ctx ActionRunContext

Parent action context

required
resumed_metadata dict[str, Any] | None

Metadata from previous interrupt (if resumed)

None
original_input Any

Original tool input before replacement (if resumed)

None
Source code in packages/genkit/src/genkit/_ai/_tools.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def __init__(
    self,
    ctx: ActionRunContext,
    resumed_metadata: dict[str, Any] | None = None,
    original_input: Any = None,  # noqa: ANN401 - prior tool_request.input when replacing on restart
) -> None:
    """Initialize from parent ActionRunContext.

    Args:
        ctx: Parent action context
        resumed_metadata: Metadata from previous interrupt (if resumed)
        original_input: Original tool input before replacement (if resumed)
    """
    super().__init__(context=ctx.context)
    self.resumed_metadata = resumed_metadata
    self.original_input = original_input

is_resumed()

Return True if this execution is resuming after an interrupt.

Source code in packages/genkit/src/genkit/_ai/_tools.py
108
109
110
def is_resumed(self) -> bool:
    """Return True if this execution is resuming after an interrupt."""
    return self.resumed_metadata is not None

genkit.StreamResponse

Bases: Generic[ChunkT_co, OutputT_co]

Wrapper for streaming action results.

Source code in packages/genkit/src/genkit/_core/_action.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class StreamResponse(Generic[ChunkT_co, OutputT_co]):
    """Wrapper for streaming action results."""

    def __init__(
        self,
        stream: AsyncIterator[ChunkT_co],
        response: Awaitable[OutputT_co],
    ) -> None:
        self._stream = stream
        self._response = response

    @property
    def stream(self) -> AsyncIterator[ChunkT_co]:
        return self._stream

    @property
    def response(self) -> Awaitable[OutputT_co]:
        return self._response

genkit.ModelStreamResponse

Bases: Generic[OutputT]

Response from streaming prompt execution with stream and response properties.

Source code in packages/genkit/src/genkit/_ai/_prompt.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class ModelStreamResponse(Generic[OutputT]):
    """Response from streaming prompt execution with stream and response properties."""

    def __init__(
        self,
        channel: Channel[ModelResponseChunk, ModelResponse[OutputT]],
        response_future: asyncio.Future[ModelResponse[OutputT]],
    ) -> None:
        """Initialize with streaming channel and response future."""
        self._channel: Channel[ModelResponseChunk, ModelResponse[OutputT]] = channel
        self._response_future: asyncio.Future[ModelResponse[OutputT]] = response_future

    @property
    def stream(self) -> AsyncIterable[ModelResponseChunk]:
        """Get the async iterable of response chunks.

        Returns:
            An async iterable that yields ModelResponseChunk objects
            as they are received from the model. Each chunk contains:
            - text: The partial text generated so far
            - index: The chunk index
            - Additional metadata from the model
        """
        return self._channel

    @property
    def response(self) -> Awaitable[ModelResponse[OutputT]]:
        """Get the awaitable for the complete response.

        Returns:
            An awaitable that resolves to a ModelResponse containing:
            - text: The complete generated text
            - output: The typed output (when using Output[T])
            - messages: The full message history
            - usage: Token usage statistics
            - finish_reason: Why generation stopped (e.g., 'stop', 'length')
            - Any tool calls or interrupts from the response
        """
        return self._response_future

response property

Get the awaitable for the complete response.

Returns:

Type Description
Awaitable[ModelResponse[OutputT]]

An awaitable that resolves to a ModelResponse containing:

Awaitable[ModelResponse[OutputT]]
  • text: The complete generated text
Awaitable[ModelResponse[OutputT]]
  • output: The typed output (when using Output[T])
Awaitable[ModelResponse[OutputT]]
  • messages: The full message history
Awaitable[ModelResponse[OutputT]]
  • usage: Token usage statistics
Awaitable[ModelResponse[OutputT]]
  • finish_reason: Why generation stopped (e.g., 'stop', 'length')
Awaitable[ModelResponse[OutputT]]
  • Any tool calls or interrupts from the response

stream property

Get the async iterable of response chunks.

Returns:

Type Description
AsyncIterable[ModelResponseChunk]

An async iterable that yields ModelResponseChunk objects

AsyncIterable[ModelResponseChunk]

as they are received from the model. Each chunk contains:

AsyncIterable[ModelResponseChunk]
  • text: The partial text generated so far
AsyncIterable[ModelResponseChunk]
  • index: The chunk index
AsyncIterable[ModelResponseChunk]
  • Additional metadata from the model

__init__(channel, response_future)

Initialize with streaming channel and response future.

Source code in packages/genkit/src/genkit/_ai/_prompt.py
146
147
148
149
150
151
152
153
def __init__(
    self,
    channel: Channel[ModelResponseChunk, ModelResponse[OutputT]],
    response_future: asyncio.Future[ModelResponse[OutputT]],
) -> None:
    """Initialize with streaming channel and response future."""
    self._channel: Channel[ModelResponseChunk, ModelResponse[OutputT]] = channel
    self._response_future: asyncio.Future[ModelResponse[OutputT]] = response_future

genkit.GenkitError

Bases: Exception

Base error class for Genkit errors.

Source code in packages/genkit/src/genkit/_core/_error.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class GenkitError(Exception):
    """Base error class for Genkit errors."""

    def __init__(
        self,
        *,
        message: str,
        status: StatusName | None = None,
        cause: Exception | None = None,
        details: Any = None,  # noqa: ANN401
        trace_id: str | None = None,
        source: str | None = None,
    ) -> None:
        """Initialize a GenkitError.

        Args:
            message: The error message.
            status: The status name for this error.
            cause: The underlying exception that caused this error.
            details: Optional detail information.
            trace_id: A unique identifier for tracing the action execution.
            source: Optional source of the error.
        """
        temp_status: StatusName
        if status:
            temp_status = status
        elif isinstance(cause, GenkitError):
            temp_status = cause.status
        else:
            temp_status = 'INTERNAL'
        self.status: StatusName = temp_status
        self.http_code: int = http_status_code(temp_status)

        source_prefix = f'{source}: ' if source else ''
        super().__init__(f'{source_prefix}{self.status}: {message}')
        self.original_message: str = message

        if not details:
            details = {}
        if 'stack' not in details:
            details['stack'] = get_error_stack(cause if cause else self)
        if 'trace_id' not in details and trace_id:
            details['trace_id'] = trace_id

        self.details: Any = details
        self.source: str | None = source
        self.trace_id: str | None = trace_id
        self.cause: Exception | None = cause

    def to_callable_serializable(self) -> HttpErrorWireFormat:
        """Returns a JSON-serializable representation of this object.

        Returns:
            An HttpErrorWireFormat model instance.
        """
        # This error type is used by 3P authors with the field "details",
        # but the actual Callable protocol value is "details"
        return HttpErrorWireFormat(
            details=self.details,
            status=StatusCodes[self.status].name,
            message=repr(self.cause) if self.cause else self.original_message,
        )

    def to_serializable(self) -> ReflectionError:
        """Returns a JSON-serializable representation of this object.

        Returns:
            A ReflectionError model instance.
        """
        return ReflectionError(
            details=ReflectionErrorDetails(**self.details) if self.details else None,
            code=StatusCodes[self.status].value,
            message=f'{self.original_message}: {repr(self.cause)}' if self.cause else self.original_message,
        )

__init__(*, message, status=None, cause=None, details=None, trace_id=None, source=None)

Initialize a GenkitError.

Parameters:

Name Type Description Default
message str

The error message.

required
status StatusName | None

The status name for this error.

None
cause Exception | None

The underlying exception that caused this error.

None
details Any

Optional detail information.

None
trace_id str | None

A unique identifier for tracing the action execution.

None
source str | None

Optional source of the error.

None
Source code in packages/genkit/src/genkit/_core/_error.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def __init__(
    self,
    *,
    message: str,
    status: StatusName | None = None,
    cause: Exception | None = None,
    details: Any = None,  # noqa: ANN401
    trace_id: str | None = None,
    source: str | None = None,
) -> None:
    """Initialize a GenkitError.

    Args:
        message: The error message.
        status: The status name for this error.
        cause: The underlying exception that caused this error.
        details: Optional detail information.
        trace_id: A unique identifier for tracing the action execution.
        source: Optional source of the error.
    """
    temp_status: StatusName
    if status:
        temp_status = status
    elif isinstance(cause, GenkitError):
        temp_status = cause.status
    else:
        temp_status = 'INTERNAL'
    self.status: StatusName = temp_status
    self.http_code: int = http_status_code(temp_status)

    source_prefix = f'{source}: ' if source else ''
    super().__init__(f'{source_prefix}{self.status}: {message}')
    self.original_message: str = message

    if not details:
        details = {}
    if 'stack' not in details:
        details['stack'] = get_error_stack(cause if cause else self)
    if 'trace_id' not in details and trace_id:
        details['trace_id'] = trace_id

    self.details: Any = details
    self.source: str | None = source
    self.trace_id: str | None = trace_id
    self.cause: Exception | None = cause

to_callable_serializable()

Returns a JSON-serializable representation of this object.

Returns:

Type Description
HttpErrorWireFormat

An HttpErrorWireFormat model instance.

Source code in packages/genkit/src/genkit/_core/_error.py
214
215
216
217
218
219
220
221
222
223
224
225
226
def to_callable_serializable(self) -> HttpErrorWireFormat:
    """Returns a JSON-serializable representation of this object.

    Returns:
        An HttpErrorWireFormat model instance.
    """
    # This error type is used by 3P authors with the field "details",
    # but the actual Callable protocol value is "details"
    return HttpErrorWireFormat(
        details=self.details,
        status=StatusCodes[self.status].name,
        message=repr(self.cause) if self.cause else self.original_message,
    )

to_serializable()

Returns a JSON-serializable representation of this object.

Returns:

Type Description
ReflectionError

A ReflectionError model instance.

Source code in packages/genkit/src/genkit/_core/_error.py
228
229
230
231
232
233
234
235
236
237
238
def to_serializable(self) -> ReflectionError:
    """Returns a JSON-serializable representation of this object.

    Returns:
        A ReflectionError model instance.
    """
    return ReflectionError(
        details=ReflectionErrorDetails(**self.details) if self.details else None,
        code=StatusCodes[self.status].value,
        message=f'{self.original_message}: {repr(self.cause)}' if self.cause else self.original_message,
    )

genkit.PublicError

Bases: GenkitError

Error class for issues to be returned to users.

Using this error allows a web framework handler (e.g. FastAPI, Flask) to know it is safe to return the message in a request. Other kinds of errors will result in a generic 500 message to avoid the possibility of internal exceptions being leaked to attackers.

Source code in packages/genkit/src/genkit/_core/_error.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class PublicError(GenkitError):
    """Error class for issues to be returned to users.

    Using this error allows a web framework handler (e.g. FastAPI, Flask) to know it
    is safe to return the message in a request. Other kinds of errors will
    result in a generic 500 message to avoid the possibility of internal
    exceptions being leaked to attackers.
    """

    def __init__(self, status: StatusName, message: str, details: Any = None) -> None:  # noqa: ANN401
        """Initialize a PublicError.

        Args:
            status: The status name for this error.
            message: The error message.
            details: Optional details to include.
        """
        super().__init__(status=status, message=message, details=details)

__init__(status, message, details=None)

Initialize a PublicError.

Parameters:

Name Type Description Default
status StatusName

The status name for this error.

required
message str

The error message.

required
details Any

Optional details to include.

None
Source code in packages/genkit/src/genkit/_core/_error.py
250
251
252
253
254
255
256
257
258
def __init__(self, status: StatusName, message: str, details: Any = None) -> None:  # noqa: ANN401
    """Initialize a PublicError.

    Args:
        status: The status name for this error.
        message: The error message.
        details: Optional details to include.
    """
    super().__init__(status=status, message=message, details=details)

genkit.Interrupt

Bases: GenkitInterrupt

Exception for interrupting tool execution with user-facing API.

Raise Interrupt(metadata) from a tool or from tool middleware (e.g. wrap_tool). Exceptions from tool.run are wrapped in GenkitError with cause=Interrupt; generation attaches interrupt metadata to the pending tool request.

To resume, use respond_to_interrupt or restart_tool.

Source code in packages/genkit/src/genkit/_ai/_tools.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class Interrupt(GenkitInterrupt):  # noqa: N818 - public Genkit name; not renamed *Error for style
    """Exception for interrupting tool execution with user-facing API.

    Raise ``Interrupt(metadata)`` from a tool or from tool middleware (e.g. ``wrap_tool``).
    Exceptions from ``tool.run`` are wrapped in GenkitError
    with ``cause=Interrupt``; generation attaches interrupt metadata to the pending tool
    request.

    To resume, use ``respond_to_interrupt`` or ``restart_tool``.
    """

    def __init__(self, metadata: dict[str, Any] | None = None) -> None:
        """Initialize an Interrupt exception.

        Args:
            metadata: Attached to the tool request on the wire. Use a plain dict; for a
                Pydantic model, pass ``m.model_dump(mode="json")``.
        """
        super().__init__()
        self.metadata: dict[str, Any] = {} if metadata is None else metadata
        if self.metadata:
            span = trace_api.get_current_span()
            if span.is_recording():
                try:
                    span.set_attribute('genkit:metadata:interrupt', json.dumps(self.metadata))
                except Exception:
                    span.set_attribute('genkit:metadata:interrupt', str(self.metadata))

__init__(metadata=None)

Initialize an Interrupt exception.

Parameters:

Name Type Description Default
metadata dict[str, Any] | None

Attached to the tool request on the wire. Use a plain dict; for a Pydantic model, pass m.model_dump(mode="json").

None
Source code in packages/genkit/src/genkit/_ai/_tools.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def __init__(self, metadata: dict[str, Any] | None = None) -> None:
    """Initialize an Interrupt exception.

    Args:
        metadata: Attached to the tool request on the wire. Use a plain dict; for a
            Pydantic model, pass ``m.model_dump(mode="json")``.
    """
    super().__init__()
    self.metadata: dict[str, Any] = {} if metadata is None else metadata
    if self.metadata:
        span = trace_api.get_current_span()
        if span.is_recording():
            try:
                span.set_attribute('genkit:metadata:interrupt', json.dumps(self.metadata))
            except Exception:
                span.set_attribute('genkit:metadata:interrupt', str(self.metadata))

genkit.Message

Bases: MessageData

Message wrapper with utility properties for text and tool requests.

Source code in packages/genkit/src/genkit/_core/_model.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class Message(MessageData):
    """Message wrapper with utility properties for text and tool requests."""

    def __init__(
        self,
        message: MessageData | None = None,
        **kwargs: object,
    ) -> None:
        """Initialize from MessageData or keyword arguments."""
        if message is not None:
            if isinstance(message, dict):
                role = message.get('role')
                if role is None:
                    raise ValueError('Message role is required')
                super().__init__(
                    role=role,
                    content=message.get('content', []),
                    metadata=message.get('metadata'),
                )
            else:
                super().__init__(
                    role=message.role,
                    content=message.content,
                    metadata=message.metadata,
                )
        else:
            super().__init__(**kwargs)  # type: ignore[arg-type]

    def __eq__(self, other: object) -> bool:
        """Compare messages by role, content, and metadata."""
        if isinstance(other, MessageData):
            return self.role == other.role and self.content == other.content and self.metadata == other.metadata
        return super().__eq__(other)

    def __hash__(self) -> int:
        """Return identity-based hash."""
        return hash(id(self))

    @cached_property
    def text(self) -> str:
        """All text parts concatenated into a single string."""
        return text_from_message(self)

    @cached_property
    def tool_requests(self) -> list[ToolRequestPart]:
        """All tool request parts in this message."""
        return [p.root for p in self.content if isinstance(p.root, ToolRequestPart)]

    @cached_property
    def interrupts(self) -> list[ToolRequestPart]:
        """Tool requests marked as interrupted."""
        return [p for p in self.tool_requests if p.metadata and p.metadata.get('interrupt')]

interrupts cached property

Tool requests marked as interrupted.

text cached property

All text parts concatenated into a single string.

tool_requests cached property

All tool request parts in this message.

__eq__(other)

Compare messages by role, content, and metadata.

Source code in packages/genkit/src/genkit/_core/_model.py
108
109
110
111
112
def __eq__(self, other: object) -> bool:
    """Compare messages by role, content, and metadata."""
    if isinstance(other, MessageData):
        return self.role == other.role and self.content == other.content and self.metadata == other.metadata
    return super().__eq__(other)

__hash__()

Return identity-based hash.

Source code in packages/genkit/src/genkit/_core/_model.py
114
115
116
def __hash__(self) -> int:
    """Return identity-based hash."""
    return hash(id(self))

__init__(message=None, **kwargs)

Initialize from MessageData or keyword arguments.

Source code in packages/genkit/src/genkit/_core/_model.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def __init__(
    self,
    message: MessageData | None = None,
    **kwargs: object,
) -> None:
    """Initialize from MessageData or keyword arguments."""
    if message is not None:
        if isinstance(message, dict):
            role = message.get('role')
            if role is None:
                raise ValueError('Message role is required')
            super().__init__(
                role=role,
                content=message.get('content', []),
                metadata=message.get('metadata'),
            )
        else:
            super().__init__(
                role=message.role,
                content=message.content,
                metadata=message.metadata,
            )
    else:
        super().__init__(**kwargs)  # type: ignore[arg-type]

genkit.Part

Bases: RootModel[TextPart | MediaPart | ToolRequestPart | ToolResponsePart | DataPart | CustomPart | ReasoningPart | ResourcePart]

Root model for Part union (Part(root=X), DocumentPart(root=X)).

Source code in packages/genkit/src/genkit/_core/_typing.py
889
890
891
892
893
894
class Part(
    RootModel[
        TextPart | MediaPart | ToolRequestPart | ToolResponsePart | DataPart | CustomPart | ReasoningPart | ResourcePart
    ]
):
    """Root model for Part union (Part(root=X), DocumentPart(root=X))."""

genkit.TextPart

Bases: GenkitModel

Model for textpart data.

Source code in packages/genkit/src/genkit/_core/_typing.py
439
440
441
442
443
444
445
446
447
448
449
450
451
class TextPart(GenkitModel):
    """Model for textpart data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    text: str = Field(...)
    media: Any | None = Field(default=None)
    tool_request: Any | None = Field(default=None)
    tool_response: Any | None = Field(default=None)
    data: Any | None = Field(default=None)
    metadata: Metadata | None = None
    custom: Custom | None = None
    reasoning: Any | None = Field(default=None)
    resource: Any | None = Field(default=None)

genkit.MediaPart

Bases: GenkitModel

Model for mediapart data.

Source code in packages/genkit/src/genkit/_core/_typing.py
322
323
324
325
326
327
328
329
330
331
332
333
334
class MediaPart(GenkitModel):
    """Model for mediapart data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    text: Any | None = Field(default=None)
    media: Media = Field(...)
    tool_request: Any | None = Field(default=None)
    tool_response: Any | None = Field(default=None)
    data: Any | None = Field(default=None)
    metadata: Metadata | None = None
    custom: Custom | None = None
    reasoning: Any | None = Field(default=None)
    resource: Any | None = Field(default=None)

genkit.Media

Bases: GenkitModel

Model for media data.

Source code in packages/genkit/src/genkit/_core/_typing.py
499
500
501
502
503
504
class Media(GenkitModel):
    """Model for media data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    content_type: str | None = None
    url: str = Field(...)

genkit.CustomPart

Bases: GenkitModel

Model for custompart data.

Source code in packages/genkit/src/genkit/_core/_typing.py
217
218
219
220
221
222
223
224
225
226
227
228
229
class CustomPart(GenkitModel):
    """Model for custompart data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    text: Any | None = Field(default=None)
    media: Any | None = Field(default=None)
    tool_request: Any | None = Field(default=None)
    tool_response: Any | None = Field(default=None)
    data: Any | None = Field(default=None)
    metadata: Metadata | None = None
    custom: Custom = Field(...)
    reasoning: Any | None = Field(default=None)
    resource: Any | None = Field(default=None)

genkit.ReasoningPart

Bases: GenkitModel

Model for reasoningpart data.

Source code in packages/genkit/src/genkit/_core/_typing.py
409
410
411
412
413
414
415
416
417
418
419
420
421
class ReasoningPart(GenkitModel):
    """Model for reasoningpart data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    text: Any | None = Field(default=None)
    media: Any | None = Field(default=None)
    tool_request: Any | None = Field(default=None)
    tool_response: Any | None = Field(default=None)
    data: Any | None = Field(default=None)
    metadata: Metadata | None = None
    custom: Custom | None = None
    reasoning: str = Field(...)
    resource: Any | None = Field(default=None)

genkit.Role

Bases: StrEnum

Role data type class.

Source code in packages/genkit/src/genkit/_core/_typing.py
56
57
58
59
60
61
62
class Role(StrEnum):
    """Role data type class."""

    SYSTEM = 'system'
    USER = 'user'
    MODEL = 'model'
    TOOL = 'tool'

genkit.Metadata = dict[str, Any] module-attribute

genkit.ToolRequest

Bases: GenkitModel

Model for toolrequest data.

Source code in packages/genkit/src/genkit/_core/_typing.py
507
508
509
510
511
512
513
514
class ToolRequest(GenkitModel):
    """Model for toolrequest data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    ref: str | None = None
    name: str = Field(...)
    input: Any | None = Field(default=None)
    partial: bool | None = None

genkit.ToolRequestPart

Bases: GenkitModel

Model for toolrequestpart data.

Source code in packages/genkit/src/genkit/_core/_typing.py
469
470
471
472
473
474
475
476
477
478
479
480
481
class ToolRequestPart(GenkitModel):
    """Model for toolrequestpart data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    text: Any | None = Field(default=None)
    media: Any | None = Field(default=None)
    tool_request: ToolRequest = Field(...)
    tool_response: Any | None = Field(default=None)
    data: Any | None = Field(default=None)
    metadata: Metadata | None = None
    custom: Custom | None = None
    reasoning: Any | None = Field(default=None)
    resource: Any | None = Field(default=None)

genkit.ToolResponse

Bases: GenkitModel

Model for toolresponse data.

Source code in packages/genkit/src/genkit/_core/_typing.py
517
518
519
520
521
522
523
524
class ToolResponse(GenkitModel):
    """Model for toolresponse data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    ref: str | None = None
    name: str = Field(...)
    output: Any | None = Field(default=None)
    content: list[Any] | None = None

genkit.ToolResponsePart

Bases: GenkitModel

Model for toolresponsepart data.

Source code in packages/genkit/src/genkit/_core/_typing.py
484
485
486
487
488
489
490
491
492
493
494
495
496
class ToolResponsePart(GenkitModel):
    """Model for toolresponsepart data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    text: Any | None = Field(default=None)
    media: Any | None = Field(default=None)
    tool_request: Any | None = Field(default=None)
    tool_response: ToolResponse = Field(...)
    data: Any | None = Field(default=None)
    metadata: Metadata | None = None
    custom: Custom | None = None
    reasoning: Any | None = Field(default=None)
    resource: Any | None = Field(default=None)

genkit.ToolDefinition

Bases: GenkitModel

Model for tooldefinition data.

Source code in packages/genkit/src/genkit/_core/_typing.py
454
455
456
457
458
459
460
461
462
463
464
465
466
class ToolDefinition(GenkitModel):
    """Model for tooldefinition data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    name: str = Field(...)
    description: str = Field(...)
    input_schema: Any | dict[str, Any] | None = Field(
        default=None, description='Valid JSON Schema representing the input of the tool.'
    )
    output_schema: Any | dict[str, Any] | None = Field(
        default=None, description='Valid JSON Schema describing the output of the tool.'
    )
    metadata: Metadata | None = None

genkit.ToolChoice

Bases: StrEnum

Tool choice for generation (auto, required, none).

Source code in packages/genkit/src/genkit/_core/_typing.py
924
925
926
927
928
929
class ToolChoice(StrEnum):
    """Tool choice for generation (auto, required, none)."""

    AUTO = 'auto'
    REQUIRED = 'required'
    NONE = 'none'

genkit.Document

Bases: DocumentData

Multi-part document that can be embedded, indexed, or retrieved.

Source code in packages/genkit/src/genkit/_core/_model.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class Document(DocumentData):
    """Multi-part document that can be embedded, indexed, or retrieved."""

    def __init__(
        self,
        content: list[DocumentPart],
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """Initialize with content parts and optional metadata."""
        doc_content = deepcopy(content)
        doc_metadata = deepcopy(metadata)
        super().__init__(content=doc_content, metadata=doc_metadata)

    @staticmethod
    def from_text(text: str, metadata: dict[str, Any] | None = None) -> Document:
        """Create a document from a text string."""
        return Document(content=[DocumentPart(root=TextPart(text=text))], metadata=metadata)

    @staticmethod
    def from_media(
        url: str,
        content_type: str | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> Document:
        """Create a document from a media URL."""
        return Document(
            content=[DocumentPart(root=MediaPart(media=Media(url=url, content_type=content_type)))],
            metadata=metadata,
        )

    @staticmethod
    def from_data(
        data: str,
        data_type: str | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> Document:
        """Create a document from data, inferring text vs media from data_type."""
        if data_type == _TEXT_DATA_TYPE:
            return Document.from_text(data, metadata)
        return Document.from_media(data, data_type, metadata)

    @cached_property
    def text(self) -> str:
        """Concatenate all text parts."""
        texts = []
        for p in self.content:
            part = p.root if hasattr(p, 'root') else p
            text_val = getattr(part, 'text', None)
            if isinstance(text_val, str):
                texts.append(text_val)
        return ''.join(texts)

    @cached_property
    def media(self) -> list[Media]:
        """Get all media parts."""
        return [
            part.root.media for part in self.content if isinstance(part.root, MediaPart) and part.root.media is not None
        ]

    @cached_property
    def data(self) -> str:
        """Primary data: text if available, otherwise first media URL."""
        if self.text:
            return self.text
        if self.media:
            return self.media[0].url
        return ''

    @cached_property
    def data_type(self) -> str | None:
        """Type of primary data: 'text' or first media's content type."""
        if self.text:
            return _TEXT_DATA_TYPE
        if self.media and self.media[0].content_type:
            return self.media[0].content_type
        return None

data cached property

Primary data: text if available, otherwise first media URL.

data_type cached property

Type of primary data: 'text' or first media's content type.

media cached property

Get all media parts.

text cached property

Concatenate all text parts.

__init__(content, metadata=None)

Initialize with content parts and optional metadata.

Source code in packages/genkit/src/genkit/_core/_model.py
151
152
153
154
155
156
157
158
159
def __init__(
    self,
    content: list[DocumentPart],
    metadata: dict[str, Any] | None = None,
) -> None:
    """Initialize with content parts and optional metadata."""
    doc_content = deepcopy(content)
    doc_metadata = deepcopy(metadata)
    super().__init__(content=doc_content, metadata=doc_metadata)

from_data(data, data_type=None, metadata=None) staticmethod

Create a document from data, inferring text vs media from data_type.

Source code in packages/genkit/src/genkit/_core/_model.py
178
179
180
181
182
183
184
185
186
187
@staticmethod
def from_data(
    data: str,
    data_type: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> Document:
    """Create a document from data, inferring text vs media from data_type."""
    if data_type == _TEXT_DATA_TYPE:
        return Document.from_text(data, metadata)
    return Document.from_media(data, data_type, metadata)

from_media(url, content_type=None, metadata=None) staticmethod

Create a document from a media URL.

Source code in packages/genkit/src/genkit/_core/_model.py
166
167
168
169
170
171
172
173
174
175
176
@staticmethod
def from_media(
    url: str,
    content_type: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> Document:
    """Create a document from a media URL."""
    return Document(
        content=[DocumentPart(root=MediaPart(media=Media(url=url, content_type=content_type)))],
        metadata=metadata,
    )

from_text(text, metadata=None) staticmethod

Create a document from a text string.

Source code in packages/genkit/src/genkit/_core/_model.py
161
162
163
164
@staticmethod
def from_text(text: str, metadata: dict[str, Any] | None = None) -> Document:
    """Create a document from a text string."""
    return Document(content=[DocumentPart(root=TextPart(text=text))], metadata=metadata)

genkit.DocumentPart

Bases: RootModel[TextPart | MediaPart]

Root model for DocumentPart union (Part(root=X), DocumentPart(root=X)).

Source code in packages/genkit/src/genkit/_core/_typing.py
885
886
class DocumentPart(RootModel[TextPart | MediaPart]):
    """Root model for DocumentPart union (Part(root=X), DocumentPart(root=X))."""

genkit.EmbedderRef

Bases: BaseModel

Reference to an embedder with configuration.

Source code in packages/genkit/src/genkit/_ai/_embedding.py
53
54
55
56
57
58
59
60
class EmbedderRef(BaseModel):
    """Reference to an embedder with configuration."""

    model_config: ClassVar[ConfigDict] = ConfigDict(extra='forbid', populate_by_name=True)

    name: str
    config: Any | None = None
    version: str | None = None

genkit.EmbedderOptions

Bases: BaseModel

Configuration options for an embedder.

Source code in packages/genkit/src/genkit/_ai/_embedding.py
42
43
44
45
46
47
48
49
50
class EmbedderOptions(BaseModel):
    """Configuration options for an embedder."""

    model_config: ClassVar[ConfigDict] = ConfigDict(extra='forbid', populate_by_name=True, alias_generator=to_camel)

    config_schema: dict[str, Any] | None = None
    label: str | None = None
    supports: EmbedderSupports | None = None
    dimensions: int | None = None

genkit.Embedding

Bases: GenkitModel

Model for embedding data.

Source code in packages/genkit/src/genkit/_core/_typing.py
105
106
107
108
109
110
class Embedding(GenkitModel):
    """Model for embedding data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    embedding: list[float] = Field(...)
    metadata: Metadata | None = None

genkit.EmbedRequest

Bases: GenkitModel

Model for embedrequest data.

Source code in packages/genkit/src/genkit/_core/_typing.py
90
91
92
93
94
95
class EmbedRequest(GenkitModel):
    """Model for embedrequest data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    input: list[DocumentData] = Field(...)
    options: Any | None = Field(default=None)

genkit.EmbedResponse

Bases: GenkitModel

Model for embedresponse data.

Source code in packages/genkit/src/genkit/_core/_typing.py
 98
 99
100
101
102
class EmbedResponse(GenkitModel):
    """Model for embedresponse data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    embeddings: list[Embedding] = Field(...)

genkit.ModelRequest

Bases: GenkitModel, Generic[ConfigT]

Hand-written model request with flat output fields and veneer types.

Output config is inlined as flat fields (output_format, output_schema, etc.) instead of a nested OutputConfig object. Messages and docs use veneer types (Message, Document) for convenience methods like .text.

Example

class GeminiConfig(ModelConfig): safety_settings: dict[str, str] | None = None

def gemini_model(request: ModelRequest[GeminiConfig]) -> ModelResponse: temp = request.config.temperature # inherited from ModelConfig for msg in request.messages: print(msg.text) # Message veneer property if request.output_format == 'json': schema = request.output_schema

Source code in packages/genkit/src/genkit/_core/_model.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
class ModelRequest(GenkitModel, Generic[ConfigT]):
    """Hand-written model request with flat output fields and veneer types.

    Output config is inlined as flat fields (output_format, output_schema, etc.)
    instead of a nested OutputConfig object. Messages and docs use veneer types
    (Message, Document) for convenience methods like .text.

    Example:
        class GeminiConfig(ModelConfig):
            safety_settings: dict[str, str] | None = None

        def gemini_model(request: ModelRequest[GeminiConfig]) -> ModelResponse:
            temp = request.config.temperature  # inherited from ModelConfig
            for msg in request.messages:
                print(msg.text)  # Message veneer property
            if request.output_format == 'json':
                schema = request.output_schema
    """

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='allow', populate_by_name=True)
    # Veneer types for IDE/typing (validators wrap MessageData->Message, DocumentData->Document)
    messages: list[Message]  # pyright: ignore[reportIncompatibleVariableOverride]
    docs: list[Document] | None = None  # pyright: ignore[reportIncompatibleVariableOverride]
    config: ConfigT | None = None
    tools: list[ToolDefinition] | None = None
    tool_choice: ToolChoice | None = Field(default=None)
    # Flat output fields (no nested OutputConfig)
    output_format: str | None = None
    output_schema: dict[str, Any] | None = None
    output_constrained: bool | None = None
    output_content_type: str | None = None

    @field_validator('messages', mode='before')
    @classmethod
    def _wrap_messages(cls, v: list[MessageData]) -> list[Message]:
        """Wrap MessageData in Message veneer for convenience methods."""
        # pyrefly: ignore[bad-return]
        return [m if isinstance(m, Message) else Message(m) for m in v]

    @field_validator('docs', mode='before')
    @classmethod
    def _wrap_docs(cls, v: list[DocumentData] | None) -> list[Document] | None:
        """Wrap DocumentData in Document veneer for convenience methods."""
        if v is None:
            return None
        # pyrefly: ignore[bad-return]
        return [d if isinstance(d, Document) else Document(d.content, d.metadata) for d in v]

    @model_serializer(mode='wrap')
    def _serialize_for_spec(self, serializer: Callable[..., dict[str, Any]]) -> dict[str, Any]:
        """Serialize to spec wire format with nested output (matches JS/Go)."""
        data = serializer(self)
        # Build nested output from flat fields - spec expects output key always present
        output: dict[str, Any] = {}
        if self.output_format is not None:
            output['format'] = self.output_format
        if self.output_schema is not None:
            output['schema'] = self.output_schema
        if self.output_constrained is not None:
            output['constrained'] = self.output_constrained
        if self.output_content_type is not None:
            output['contentType'] = self.output_content_type
        # Remove flat fields, add nested output
        data.pop('outputFormat', None)
        data.pop('outputSchema', None)
        data.pop('outputConstrained', None)
        data.pop('outputContentType', None)
        data['output'] = output
        return data

genkit.ModelResponse

Bases: GenkitModel, Generic[OutputT]

Model response with utilities for text extraction, output parsing, and validation.

Source code in packages/genkit/src/genkit/_core/_model.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
class ModelResponse(GenkitModel, Generic[OutputT]):
    """Model response with utilities for text extraction, output parsing, and validation."""

    # _message_parser and _schema_type are set by the framework after construction
    # when output format parsing or schema validation is needed.
    _message_parser: Callable[[Message], object] | None = PrivateAttr(None)
    _schema_type: type[BaseModel] | None = PrivateAttr(None)
    # Wire fields (must be declared for extra='forbid' to accept wire responses)
    message: Message | None = None
    finish_reason: FinishReason | None = None
    finish_message: str | None = None
    latency_ms: float | None = None
    usage: GenerationUsage | None = None
    custom: dict[str, Any] | None = None
    raw: dict[str, Any] | None = None
    request: ModelRequest | None = None
    operation: Operation | None = None
    candidates: list[Candidate] | None = None

    def model_post_init(self, __context: object) -> None:
        """Initialize default usage and custom dict if not provided."""
        if self.usage is None:
            self.usage = GenerationUsage()
        if self.custom is None:
            self.custom = {}

    def assert_valid(self) -> None:
        """Validate response structure. (TODO: not yet implemented)."""
        # TODO(#4343): implement
        pass

    def assert_valid_schema(self) -> None:
        """Validate response conforms to output schema. (TODO: not yet implemented)."""
        # TODO(#4343): implement
        pass

    def __eq__(self, other: object) -> bool:
        """Compare responses by message and finish_reason."""
        if isinstance(other, ModelResponse):
            return self.message == other.message and self.finish_reason == other.finish_reason
        return super().__eq__(other)

    def __hash__(self) -> int:
        """Return identity-based hash."""
        return hash(id(self))

    @cached_property
    def text(self) -> str:
        """All text parts concatenated into a single string."""
        if self.message is None:
            return ''
        return self.message.text

    @cached_property
    def output(self) -> OutputT:
        """Parsed JSON output from the response text, validated against schema if set."""
        if self._message_parser and self.message is not None:
            parsed = self._message_parser(self.message)
        else:
            parsed = extract_json(self.text)

        # If we have a schema type and the parsed output is a dict, validate and
        # return a proper Pydantic instance. Skip if parsed is already the correct
        # type or if it's not a dict (e.g., custom formats may return strings).
        if self._schema_type is not None and parsed is not None and isinstance(parsed, dict):
            return cast(OutputT, self._schema_type.model_validate(parsed))

        return cast(OutputT, parsed)

    @cached_property
    def messages(self) -> list[Message]:
        """All messages including request history and the response message."""
        if self.message is None:
            return [Message(m) for m in self.request.messages] if self.request else []
        return [
            *(Message(m) for m in (self.request.messages if self.request else [])),
            self.message,
        ]

    @cached_property
    def tool_requests(self) -> list[ToolRequestPart]:
        """All tool request parts in the response message."""
        if self.message is None:
            return []
        return self.message.tool_requests

    @cached_property
    def media(self) -> list[Media]:
        """All media parts in the response message."""
        if self.message is None:
            return []
        return [
            part.root.media
            for part in self.message.content
            if isinstance(part.root, MediaPart) and part.root.media is not None
        ]

    @cached_property
    def interrupts(self) -> list[ToolRequestPart]:
        """Tool requests marked as interrupted."""
        if self.message is None:
            return []
        return self.message.interrupts

interrupts cached property

Tool requests marked as interrupted.

media cached property

All media parts in the response message.

messages cached property

All messages including request history and the response message.

output cached property

Parsed JSON output from the response text, validated against schema if set.

text cached property

All text parts concatenated into a single string.

tool_requests cached property

All tool request parts in the response message.

__eq__(other)

Compare responses by message and finish_reason.

Source code in packages/genkit/src/genkit/_core/_model.py
333
334
335
336
337
def __eq__(self, other: object) -> bool:
    """Compare responses by message and finish_reason."""
    if isinstance(other, ModelResponse):
        return self.message == other.message and self.finish_reason == other.finish_reason
    return super().__eq__(other)

__hash__()

Return identity-based hash.

Source code in packages/genkit/src/genkit/_core/_model.py
339
340
341
def __hash__(self) -> int:
    """Return identity-based hash."""
    return hash(id(self))

assert_valid()

Validate response structure. (TODO: not yet implemented).

Source code in packages/genkit/src/genkit/_core/_model.py
323
324
325
326
def assert_valid(self) -> None:
    """Validate response structure. (TODO: not yet implemented)."""
    # TODO(#4343): implement
    pass

assert_valid_schema()

Validate response conforms to output schema. (TODO: not yet implemented).

Source code in packages/genkit/src/genkit/_core/_model.py
328
329
330
331
def assert_valid_schema(self) -> None:
    """Validate response conforms to output schema. (TODO: not yet implemented)."""
    # TODO(#4343): implement
    pass

model_post_init(__context)

Initialize default usage and custom dict if not provided.

Source code in packages/genkit/src/genkit/_core/_model.py
316
317
318
319
320
321
def model_post_init(self, __context: object) -> None:
    """Initialize default usage and custom dict if not provided."""
    if self.usage is None:
        self.usage = GenerationUsage()
    if self.custom is None:
        self.custom = {}

genkit.ModelResponseChunk

Bases: GenerateResponseChunk, Generic[OutputT]

Streaming chunk with text, accumulated text, and output parsing.

Source code in packages/genkit/src/genkit/_core/_model.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
class ModelResponseChunk(GenerateResponseChunk, Generic[OutputT]):
    """Streaming chunk with text, accumulated text, and output parsing."""

    # Field(exclude=True) means these fields are not included in serialization
    previous_chunks: list[ModelResponseChunk[Any]] = Field(default_factory=list, exclude=True)
    chunk_parser: Callable[[ModelResponseChunk[Any]], object] | None = Field(None, exclude=True)

    def __init__(
        self,
        chunk: ModelResponseChunk[Any] | None = None,
        previous_chunks: list[ModelResponseChunk[Any]] | None = None,
        index: int | float | None = None,
        chunk_parser: Callable[[ModelResponseChunk[Any]], object] | None = None,
        **kwargs: Any,  # noqa: ANN401
    ) -> None:
        """Initialize from a chunk or keyword arguments."""
        if chunk is not None:
            # Framework wrapping mode
            super().__init__(
                role=chunk.role,
                index=index,
                content=chunk.content,
                custom=chunk.custom,
                aggregated=chunk.aggregated,
            )
        else:
            # No source chunk — caller passes fields (role, content, etc.) as kwargs directly
            super().__init__(**kwargs)
        self.previous_chunks = previous_chunks or []
        self.chunk_parser = chunk_parser

    def __eq__(self, other: object) -> bool:
        """Check equality."""
        if isinstance(other, ModelResponseChunk):
            return self.role == other.role and self.content == other.content
        return super().__eq__(other)

    def __hash__(self) -> int:
        """Return hash."""
        return hash(id(self))

    @cached_property
    def text(self) -> str:
        """Text content of this chunk."""
        parts: list[str] = []
        for p in self.content:
            text_val = p.root.text
            if text_val is not None:
                # Handle Text RootModel (access .root) or plain str
                if isinstance(text_val, Text):
                    parts.append(str(text_val.root) if text_val.root is not None else '')
                else:
                    parts.append(str(text_val))
        return ''.join(parts)

    @cached_property
    def accumulated_text(self) -> str:
        """Text from all previous chunks plus this chunk."""
        parts: list[str] = []
        if self.previous_chunks:
            for chunk in self.previous_chunks:
                for p in chunk.content:
                    text_val = p.root.text
                    if text_val:
                        # Handle Text RootModel (access .root) or plain str
                        if isinstance(text_val, Text):
                            parts.append(str(text_val.root) if text_val.root is not None else '')
                        else:
                            parts.append(str(text_val))
        return ''.join(parts) + self.text

    @cached_property
    def output(self) -> OutputT:
        """Parsed JSON output from accumulated text."""
        if self.chunk_parser:
            return cast(OutputT, self.chunk_parser(self))
        return cast(OutputT, extract_json(self.accumulated_text))

accumulated_text cached property

Text from all previous chunks plus this chunk.

output cached property

Parsed JSON output from accumulated text.

text cached property

Text content of this chunk.

__eq__(other)

Check equality.

Source code in packages/genkit/src/genkit/_core/_model.py
433
434
435
436
437
def __eq__(self, other: object) -> bool:
    """Check equality."""
    if isinstance(other, ModelResponseChunk):
        return self.role == other.role and self.content == other.content
    return super().__eq__(other)

__hash__()

Return hash.

Source code in packages/genkit/src/genkit/_core/_model.py
439
440
441
def __hash__(self) -> int:
    """Return hash."""
    return hash(id(self))

__init__(chunk=None, previous_chunks=None, index=None, chunk_parser=None, **kwargs)

Initialize from a chunk or keyword arguments.

Source code in packages/genkit/src/genkit/_core/_model.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def __init__(
    self,
    chunk: ModelResponseChunk[Any] | None = None,
    previous_chunks: list[ModelResponseChunk[Any]] | None = None,
    index: int | float | None = None,
    chunk_parser: Callable[[ModelResponseChunk[Any]], object] | None = None,
    **kwargs: Any,  # noqa: ANN401
) -> None:
    """Initialize from a chunk or keyword arguments."""
    if chunk is not None:
        # Framework wrapping mode
        super().__init__(
            role=chunk.role,
            index=index,
            content=chunk.content,
            custom=chunk.custom,
            aggregated=chunk.aggregated,
        )
    else:
        # No source chunk — caller passes fields (role, content, etc.) as kwargs directly
        super().__init__(**kwargs)
    self.previous_chunks = previous_chunks or []
    self.chunk_parser = chunk_parser

genkit.ModelConfig = GenerationCommonConfig module-attribute

genkit.ModelInfo

Bases: GenkitModel

Model for modelinfo data.

Source code in packages/genkit/src/genkit/_core/_typing.py
346
347
348
349
350
351
352
353
354
class ModelInfo(GenkitModel):
    """Model for modelinfo data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    versions: list[str] | None = None
    label: str | None = None
    config_schema: ConfigSchema | None = None
    supports: Supports | None = None
    stage: Stage | None = None

genkit.ModelUsage = GenerationUsage module-attribute

genkit.Constrained

Bases: StrEnum

Constrained generation support (none, all, no-tools).

Source code in packages/genkit/src/genkit/_core/_typing.py
906
907
908
909
910
911
class Constrained(StrEnum):
    """Constrained generation support (none, all, no-tools)."""

    NONE = 'none'
    ALL = 'all'
    NO_TOOLS = 'no-tools'

genkit.Stage

Bases: StrEnum

Model stage (featured, stable, unstable, legacy, deprecated).

Source code in packages/genkit/src/genkit/_core/_typing.py
914
915
916
917
918
919
920
921
class Stage(StrEnum):
    """Model stage (featured, stable, unstable, legacy, deprecated)."""

    FEATURED = 'featured'
    STABLE = 'stable'
    UNSTABLE = 'unstable'
    LEGACY = 'legacy'
    DEPRECATED = 'deprecated'

genkit.Supports

Bases: GenkitModel

Model for supports data.

Source code in packages/genkit/src/genkit/_core/_typing.py
799
800
801
802
803
804
805
806
807
808
809
810
811
812
class Supports(GenkitModel):
    """Model for supports data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    multiturn: bool | None = None
    media: bool | None = None
    tools: bool | None = None
    system_role: bool | None = None
    output: list[str] | None = None
    content_type: list[str] | None = None
    context: bool | None = None
    constrained: Constrained | None = None
    tool_choice: bool | None = None
    long_running: bool | None = None

genkit.FinishReason

Bases: StrEnum

FinishReason data type class.

Source code in packages/genkit/src/genkit/_core/_typing.py
45
46
47
48
49
50
51
52
53
class FinishReason(StrEnum):
    """FinishReason data type class."""

    STOP = 'stop'
    LENGTH = 'length'
    BLOCKED = 'blocked'
    INTERRUPTED = 'interrupted'
    OTHER = 'other'
    UNKNOWN = 'unknown'

genkit.model

genkit.model.BackgroundAction

Bases: Generic[OutputT]

A background action that can run for a long time.

Unlike regular actions, background actions can run for extended periods. The returned operation can be used to check status and retrieve the response.

This class matches the JS BackgroundAction interface from js/core/src/background-action.ts.

Attributes:

Name Type Description
__action

Action metadata (matches JS __action property).

start_action

Action to start the operation.

check_action

Action to check operation status.

cancel_action

Optional action to cancel operations.

supports_cancel bool

Whether this action supports cancellation.

Source code in packages/genkit/src/genkit/_core/_background.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class BackgroundAction(Generic[OutputT]):
    """A background action that can run for a long time.

    Unlike regular actions, background actions can run for extended periods.
    The returned operation can be used to check status and retrieve the response.

    This class matches the JS BackgroundAction interface from
    js/core/src/background-action.ts.

    Attributes:
        __action: Action metadata (matches JS __action property).
        start_action: Action to start the operation.
        check_action: Action to check operation status.
        cancel_action: Optional action to cancel operations.
        supports_cancel: Whether this action supports cancellation.
    """

    def __init__(
        self,
        start_action: Action,
        check_action: Action,
        cancel_action: Action | None = None,
    ) -> None:
        """Initialize a BackgroundAction.

        Args:
            start_action: Action to start the operation.
            check_action: Action to check operation status.
            cancel_action: Optional action to cancel the operation.
        """
        self.start_action = start_action
        self.check_action = check_action
        self.cancel_action = cancel_action

        # Match JS __action property structure
        self.__action = {
            'name': start_action.name,
            'description': start_action.description,
            'actionType': start_action.kind,
            'metadata': start_action.metadata,
        }

    @property
    def name(self) -> str:
        """The name of the background action."""
        return self.start_action.name

    @property
    def supports_cancel(self) -> bool:
        """Whether this background action supports cancellation."""
        return self.cancel_action is not None

    async def start(
        self,
        input: ModelRequest | None = None,
        options: dict[str, Any] | None = None,
    ) -> Operation:
        """Start a background operation.

        Matches JS: start(input?, options?) => Promise<Operation<OutputT>>

        Args:
            input: The input request.
            options: Optional run options.

        Returns:
            An Operation with an ID to track the job.
        """
        result = await self.start_action.run(input)
        return _ensure_operation(result.response)

    async def check(self, operation: Operation) -> Operation:
        """Check the status of a background operation.

        Matches JS: check(operation) => Promise<Operation<OutputT>>

        Args:
            operation: The operation to check.

        Returns:
            Updated Operation with current status.
        """
        result = await self.check_action.run(operation)
        return _ensure_operation(result.response)

    async def cancel(self, operation: Operation) -> Operation:
        """Cancel a background operation.

        Matches JS: cancel(operation) => Promise<Operation<OutputT>>

        If cancellation is not supported, returns the operation unchanged
        (matching JS behavior).

        Args:
            operation: The operation to cancel.

        Returns:
            Updated Operation reflecting cancellation attempt.
        """
        if self.cancel_action is None:
            # Match JS behavior: return operation unchanged if cancel not supported
            return operation
        result = await self.cancel_action.run(operation)
        return _ensure_operation(result.response)

name property

The name of the background action.

supports_cancel property

Whether this background action supports cancellation.

__init__(start_action, check_action, cancel_action=None)

Initialize a BackgroundAction.

Parameters:

Name Type Description Default
start_action Action

Action to start the operation.

required
check_action Action

Action to check operation status.

required
cancel_action Action | None

Optional action to cancel the operation.

None
Source code in packages/genkit/src/genkit/_core/_background.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(
    self,
    start_action: Action,
    check_action: Action,
    cancel_action: Action | None = None,
) -> None:
    """Initialize a BackgroundAction.

    Args:
        start_action: Action to start the operation.
        check_action: Action to check operation status.
        cancel_action: Optional action to cancel the operation.
    """
    self.start_action = start_action
    self.check_action = check_action
    self.cancel_action = cancel_action

    # Match JS __action property structure
    self.__action = {
        'name': start_action.name,
        'description': start_action.description,
        'actionType': start_action.kind,
        'metadata': start_action.metadata,
    }

cancel(operation) async

Cancel a background operation.

Matches JS: cancel(operation) => Promise>

If cancellation is not supported, returns the operation unchanged (matching JS behavior).

Parameters:

Name Type Description Default
operation Operation

The operation to cancel.

required

Returns:

Type Description
Operation

Updated Operation reflecting cancellation attempt.

Source code in packages/genkit/src/genkit/_core/_background.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
async def cancel(self, operation: Operation) -> Operation:
    """Cancel a background operation.

    Matches JS: cancel(operation) => Promise<Operation<OutputT>>

    If cancellation is not supported, returns the operation unchanged
    (matching JS behavior).

    Args:
        operation: The operation to cancel.

    Returns:
        Updated Operation reflecting cancellation attempt.
    """
    if self.cancel_action is None:
        # Match JS behavior: return operation unchanged if cancel not supported
        return operation
    result = await self.cancel_action.run(operation)
    return _ensure_operation(result.response)

check(operation) async

Check the status of a background operation.

Matches JS: check(operation) => Promise>

Parameters:

Name Type Description Default
operation Operation

The operation to check.

required

Returns:

Type Description
Operation

Updated Operation with current status.

Source code in packages/genkit/src/genkit/_core/_background.py
133
134
135
136
137
138
139
140
141
142
143
144
145
async def check(self, operation: Operation) -> Operation:
    """Check the status of a background operation.

    Matches JS: check(operation) => Promise<Operation<OutputT>>

    Args:
        operation: The operation to check.

    Returns:
        Updated Operation with current status.
    """
    result = await self.check_action.run(operation)
    return _ensure_operation(result.response)

start(input=None, options=None) async

Start a background operation.

Matches JS: start(input?, options?) => Promise>

Parameters:

Name Type Description Default
input ModelRequest | None

The input request.

None
options dict[str, Any] | None

Optional run options.

None

Returns:

Type Description
Operation

An Operation with an ID to track the job.

Source code in packages/genkit/src/genkit/_core/_background.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
async def start(
    self,
    input: ModelRequest | None = None,
    options: dict[str, Any] | None = None,
) -> Operation:
    """Start a background operation.

    Matches JS: start(input?, options?) => Promise<Operation<OutputT>>

    Args:
        input: The input request.
        options: Optional run options.

    Returns:
        An Operation with an ID to track the job.
    """
    result = await self.start_action.run(input)
    return _ensure_operation(result.response)

genkit.model.ModelRequest

Bases: GenkitModel, Generic[ConfigT]

Hand-written model request with flat output fields and veneer types.

Output config is inlined as flat fields (output_format, output_schema, etc.) instead of a nested OutputConfig object. Messages and docs use veneer types (Message, Document) for convenience methods like .text.

Example

class GeminiConfig(ModelConfig): safety_settings: dict[str, str] | None = None

def gemini_model(request: ModelRequest[GeminiConfig]) -> ModelResponse: temp = request.config.temperature # inherited from ModelConfig for msg in request.messages: print(msg.text) # Message veneer property if request.output_format == 'json': schema = request.output_schema

Source code in packages/genkit/src/genkit/_core/_model.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
class ModelRequest(GenkitModel, Generic[ConfigT]):
    """Hand-written model request with flat output fields and veneer types.

    Output config is inlined as flat fields (output_format, output_schema, etc.)
    instead of a nested OutputConfig object. Messages and docs use veneer types
    (Message, Document) for convenience methods like .text.

    Example:
        class GeminiConfig(ModelConfig):
            safety_settings: dict[str, str] | None = None

        def gemini_model(request: ModelRequest[GeminiConfig]) -> ModelResponse:
            temp = request.config.temperature  # inherited from ModelConfig
            for msg in request.messages:
                print(msg.text)  # Message veneer property
            if request.output_format == 'json':
                schema = request.output_schema
    """

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='allow', populate_by_name=True)
    # Veneer types for IDE/typing (validators wrap MessageData->Message, DocumentData->Document)
    messages: list[Message]  # pyright: ignore[reportIncompatibleVariableOverride]
    docs: list[Document] | None = None  # pyright: ignore[reportIncompatibleVariableOverride]
    config: ConfigT | None = None
    tools: list[ToolDefinition] | None = None
    tool_choice: ToolChoice | None = Field(default=None)
    # Flat output fields (no nested OutputConfig)
    output_format: str | None = None
    output_schema: dict[str, Any] | None = None
    output_constrained: bool | None = None
    output_content_type: str | None = None

    @field_validator('messages', mode='before')
    @classmethod
    def _wrap_messages(cls, v: list[MessageData]) -> list[Message]:
        """Wrap MessageData in Message veneer for convenience methods."""
        # pyrefly: ignore[bad-return]
        return [m if isinstance(m, Message) else Message(m) for m in v]

    @field_validator('docs', mode='before')
    @classmethod
    def _wrap_docs(cls, v: list[DocumentData] | None) -> list[Document] | None:
        """Wrap DocumentData in Document veneer for convenience methods."""
        if v is None:
            return None
        # pyrefly: ignore[bad-return]
        return [d if isinstance(d, Document) else Document(d.content, d.metadata) for d in v]

    @model_serializer(mode='wrap')
    def _serialize_for_spec(self, serializer: Callable[..., dict[str, Any]]) -> dict[str, Any]:
        """Serialize to spec wire format with nested output (matches JS/Go)."""
        data = serializer(self)
        # Build nested output from flat fields - spec expects output key always present
        output: dict[str, Any] = {}
        if self.output_format is not None:
            output['format'] = self.output_format
        if self.output_schema is not None:
            output['schema'] = self.output_schema
        if self.output_constrained is not None:
            output['constrained'] = self.output_constrained
        if self.output_content_type is not None:
            output['contentType'] = self.output_content_type
        # Remove flat fields, add nested output
        data.pop('outputFormat', None)
        data.pop('outputSchema', None)
        data.pop('outputConstrained', None)
        data.pop('outputContentType', None)
        data['output'] = output
        return data

genkit.model.ModelResponse

Bases: GenkitModel, Generic[OutputT]

Model response with utilities for text extraction, output parsing, and validation.

Source code in packages/genkit/src/genkit/_core/_model.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
class ModelResponse(GenkitModel, Generic[OutputT]):
    """Model response with utilities for text extraction, output parsing, and validation."""

    # _message_parser and _schema_type are set by the framework after construction
    # when output format parsing or schema validation is needed.
    _message_parser: Callable[[Message], object] | None = PrivateAttr(None)
    _schema_type: type[BaseModel] | None = PrivateAttr(None)
    # Wire fields (must be declared for extra='forbid' to accept wire responses)
    message: Message | None = None
    finish_reason: FinishReason | None = None
    finish_message: str | None = None
    latency_ms: float | None = None
    usage: GenerationUsage | None = None
    custom: dict[str, Any] | None = None
    raw: dict[str, Any] | None = None
    request: ModelRequest | None = None
    operation: Operation | None = None
    candidates: list[Candidate] | None = None

    def model_post_init(self, __context: object) -> None:
        """Initialize default usage and custom dict if not provided."""
        if self.usage is None:
            self.usage = GenerationUsage()
        if self.custom is None:
            self.custom = {}

    def assert_valid(self) -> None:
        """Validate response structure. (TODO: not yet implemented)."""
        # TODO(#4343): implement
        pass

    def assert_valid_schema(self) -> None:
        """Validate response conforms to output schema. (TODO: not yet implemented)."""
        # TODO(#4343): implement
        pass

    def __eq__(self, other: object) -> bool:
        """Compare responses by message and finish_reason."""
        if isinstance(other, ModelResponse):
            return self.message == other.message and self.finish_reason == other.finish_reason
        return super().__eq__(other)

    def __hash__(self) -> int:
        """Return identity-based hash."""
        return hash(id(self))

    @cached_property
    def text(self) -> str:
        """All text parts concatenated into a single string."""
        if self.message is None:
            return ''
        return self.message.text

    @cached_property
    def output(self) -> OutputT:
        """Parsed JSON output from the response text, validated against schema if set."""
        if self._message_parser and self.message is not None:
            parsed = self._message_parser(self.message)
        else:
            parsed = extract_json(self.text)

        # If we have a schema type and the parsed output is a dict, validate and
        # return a proper Pydantic instance. Skip if parsed is already the correct
        # type or if it's not a dict (e.g., custom formats may return strings).
        if self._schema_type is not None and parsed is not None and isinstance(parsed, dict):
            return cast(OutputT, self._schema_type.model_validate(parsed))

        return cast(OutputT, parsed)

    @cached_property
    def messages(self) -> list[Message]:
        """All messages including request history and the response message."""
        if self.message is None:
            return [Message(m) for m in self.request.messages] if self.request else []
        return [
            *(Message(m) for m in (self.request.messages if self.request else [])),
            self.message,
        ]

    @cached_property
    def tool_requests(self) -> list[ToolRequestPart]:
        """All tool request parts in the response message."""
        if self.message is None:
            return []
        return self.message.tool_requests

    @cached_property
    def media(self) -> list[Media]:
        """All media parts in the response message."""
        if self.message is None:
            return []
        return [
            part.root.media
            for part in self.message.content
            if isinstance(part.root, MediaPart) and part.root.media is not None
        ]

    @cached_property
    def interrupts(self) -> list[ToolRequestPart]:
        """Tool requests marked as interrupted."""
        if self.message is None:
            return []
        return self.message.interrupts

interrupts cached property

Tool requests marked as interrupted.

media cached property

All media parts in the response message.

messages cached property

All messages including request history and the response message.

output cached property

Parsed JSON output from the response text, validated against schema if set.

text cached property

All text parts concatenated into a single string.

tool_requests cached property

All tool request parts in the response message.

__eq__(other)

Compare responses by message and finish_reason.

Source code in packages/genkit/src/genkit/_core/_model.py
333
334
335
336
337
def __eq__(self, other: object) -> bool:
    """Compare responses by message and finish_reason."""
    if isinstance(other, ModelResponse):
        return self.message == other.message and self.finish_reason == other.finish_reason
    return super().__eq__(other)

__hash__()

Return identity-based hash.

Source code in packages/genkit/src/genkit/_core/_model.py
339
340
341
def __hash__(self) -> int:
    """Return identity-based hash."""
    return hash(id(self))

assert_valid()

Validate response structure. (TODO: not yet implemented).

Source code in packages/genkit/src/genkit/_core/_model.py
323
324
325
326
def assert_valid(self) -> None:
    """Validate response structure. (TODO: not yet implemented)."""
    # TODO(#4343): implement
    pass

assert_valid_schema()

Validate response conforms to output schema. (TODO: not yet implemented).

Source code in packages/genkit/src/genkit/_core/_model.py
328
329
330
331
def assert_valid_schema(self) -> None:
    """Validate response conforms to output schema. (TODO: not yet implemented)."""
    # TODO(#4343): implement
    pass

model_post_init(__context)

Initialize default usage and custom dict if not provided.

Source code in packages/genkit/src/genkit/_core/_model.py
316
317
318
319
320
321
def model_post_init(self, __context: object) -> None:
    """Initialize default usage and custom dict if not provided."""
    if self.usage is None:
        self.usage = GenerationUsage()
    if self.custom is None:
        self.custom = {}

genkit.model.ModelResponseChunk

Bases: GenerateResponseChunk, Generic[OutputT]

Streaming chunk with text, accumulated text, and output parsing.

Source code in packages/genkit/src/genkit/_core/_model.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
class ModelResponseChunk(GenerateResponseChunk, Generic[OutputT]):
    """Streaming chunk with text, accumulated text, and output parsing."""

    # Field(exclude=True) means these fields are not included in serialization
    previous_chunks: list[ModelResponseChunk[Any]] = Field(default_factory=list, exclude=True)
    chunk_parser: Callable[[ModelResponseChunk[Any]], object] | None = Field(None, exclude=True)

    def __init__(
        self,
        chunk: ModelResponseChunk[Any] | None = None,
        previous_chunks: list[ModelResponseChunk[Any]] | None = None,
        index: int | float | None = None,
        chunk_parser: Callable[[ModelResponseChunk[Any]], object] | None = None,
        **kwargs: Any,  # noqa: ANN401
    ) -> None:
        """Initialize from a chunk or keyword arguments."""
        if chunk is not None:
            # Framework wrapping mode
            super().__init__(
                role=chunk.role,
                index=index,
                content=chunk.content,
                custom=chunk.custom,
                aggregated=chunk.aggregated,
            )
        else:
            # No source chunk — caller passes fields (role, content, etc.) as kwargs directly
            super().__init__(**kwargs)
        self.previous_chunks = previous_chunks or []
        self.chunk_parser = chunk_parser

    def __eq__(self, other: object) -> bool:
        """Check equality."""
        if isinstance(other, ModelResponseChunk):
            return self.role == other.role and self.content == other.content
        return super().__eq__(other)

    def __hash__(self) -> int:
        """Return hash."""
        return hash(id(self))

    @cached_property
    def text(self) -> str:
        """Text content of this chunk."""
        parts: list[str] = []
        for p in self.content:
            text_val = p.root.text
            if text_val is not None:
                # Handle Text RootModel (access .root) or plain str
                if isinstance(text_val, Text):
                    parts.append(str(text_val.root) if text_val.root is not None else '')
                else:
                    parts.append(str(text_val))
        return ''.join(parts)

    @cached_property
    def accumulated_text(self) -> str:
        """Text from all previous chunks plus this chunk."""
        parts: list[str] = []
        if self.previous_chunks:
            for chunk in self.previous_chunks:
                for p in chunk.content:
                    text_val = p.root.text
                    if text_val:
                        # Handle Text RootModel (access .root) or plain str
                        if isinstance(text_val, Text):
                            parts.append(str(text_val.root) if text_val.root is not None else '')
                        else:
                            parts.append(str(text_val))
        return ''.join(parts) + self.text

    @cached_property
    def output(self) -> OutputT:
        """Parsed JSON output from accumulated text."""
        if self.chunk_parser:
            return cast(OutputT, self.chunk_parser(self))
        return cast(OutputT, extract_json(self.accumulated_text))

accumulated_text cached property

Text from all previous chunks plus this chunk.

output cached property

Parsed JSON output from accumulated text.

text cached property

Text content of this chunk.

__eq__(other)

Check equality.

Source code in packages/genkit/src/genkit/_core/_model.py
433
434
435
436
437
def __eq__(self, other: object) -> bool:
    """Check equality."""
    if isinstance(other, ModelResponseChunk):
        return self.role == other.role and self.content == other.content
    return super().__eq__(other)

__hash__()

Return hash.

Source code in packages/genkit/src/genkit/_core/_model.py
439
440
441
def __hash__(self) -> int:
    """Return hash."""
    return hash(id(self))

__init__(chunk=None, previous_chunks=None, index=None, chunk_parser=None, **kwargs)

Initialize from a chunk or keyword arguments.

Source code in packages/genkit/src/genkit/_core/_model.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def __init__(
    self,
    chunk: ModelResponseChunk[Any] | None = None,
    previous_chunks: list[ModelResponseChunk[Any]] | None = None,
    index: int | float | None = None,
    chunk_parser: Callable[[ModelResponseChunk[Any]], object] | None = None,
    **kwargs: Any,  # noqa: ANN401
) -> None:
    """Initialize from a chunk or keyword arguments."""
    if chunk is not None:
        # Framework wrapping mode
        super().__init__(
            role=chunk.role,
            index=index,
            content=chunk.content,
            custom=chunk.custom,
            aggregated=chunk.aggregated,
        )
    else:
        # No source chunk — caller passes fields (role, content, etc.) as kwargs directly
        super().__init__(**kwargs)
    self.previous_chunks = previous_chunks or []
    self.chunk_parser = chunk_parser

genkit.model.ModelUsage = GenerationUsage module-attribute

genkit.model.Candidate

Bases: GenkitModel

Model for candidate data.

Source code in packages/genkit/src/genkit/_core/_typing.py
205
206
207
208
209
210
211
212
213
214
class Candidate(GenkitModel):
    """Model for candidate data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    index: float = Field(...)
    message: MessageData = Field(...)
    usage: GenerationUsage | None = None
    finish_reason: FinishReason = Field(...)
    finish_message: str | None = None
    custom: Any | None = Field(default=None)

genkit.model.FinishReason

Bases: StrEnum

FinishReason data type class.

Source code in packages/genkit/src/genkit/_core/_typing.py
45
46
47
48
49
50
51
52
53
class FinishReason(StrEnum):
    """FinishReason data type class."""

    STOP = 'stop'
    LENGTH = 'length'
    BLOCKED = 'blocked'
    INTERRUPTED = 'interrupted'
    OTHER = 'other'
    UNKNOWN = 'unknown'

genkit.model.GenerateActionOptions

Bases: GenerateActionOptionsData

Generate options with messages as list[Message] for type-safe use with ai.generate().

Source code in packages/genkit/src/genkit/_core/_model.py
134
135
136
137
138
139
140
141
142
class GenerateActionOptions(GenerateActionOptionsData):
    """Generate options with messages as list[Message] for type-safe use with ai.generate()."""

    messages: list[Message]

    @field_validator('messages', mode='before')
    @classmethod
    def _wrap_messages(cls, v: list[MessageData]) -> list[Message]:
        return [m if isinstance(m, Message) else Message(m) for m in v]

genkit.model.Error

Bases: GenkitModel

Model for error data.

Source code in packages/genkit/src/genkit/_core/_typing.py
815
816
817
818
819
class Error(GenkitModel):
    """Model for error data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='allow', populate_by_name=True)
    message: str = Field(...)

genkit.model.Operation

Bases: GenkitModel

Model for operation data.

Source code in packages/genkit/src/genkit/_core/_typing.py
385
386
387
388
389
390
391
392
393
394
class Operation(GenkitModel):
    """Model for operation data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    action: str | None = None
    id: str = Field(...)
    done: bool | None = None
    output: Any | None = Field(default=None)
    error: Error | None = None
    metadata: Metadata | None = None

genkit.model.ToolRequest

Bases: GenkitModel

Model for toolrequest data.

Source code in packages/genkit/src/genkit/_core/_typing.py
507
508
509
510
511
512
513
514
class ToolRequest(GenkitModel):
    """Model for toolrequest data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    ref: str | None = None
    name: str = Field(...)
    input: Any | None = Field(default=None)
    partial: bool | None = None

genkit.model.ToolDefinition

Bases: GenkitModel

Model for tooldefinition data.

Source code in packages/genkit/src/genkit/_core/_typing.py
454
455
456
457
458
459
460
461
462
463
464
465
466
class ToolDefinition(GenkitModel):
    """Model for tooldefinition data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    name: str = Field(...)
    description: str = Field(...)
    input_schema: Any | dict[str, Any] | None = Field(
        default=None, description='Valid JSON Schema representing the input of the tool.'
    )
    output_schema: Any | dict[str, Any] | None = Field(
        default=None, description='Valid JSON Schema describing the output of the tool.'
    )
    metadata: Metadata | None = None

genkit.model.ToolResponse

Bases: GenkitModel

Model for toolresponse data.

Source code in packages/genkit/src/genkit/_core/_typing.py
517
518
519
520
521
522
523
524
class ToolResponse(GenkitModel):
    """Model for toolresponse data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    ref: str | None = None
    name: str = Field(...)
    output: Any | None = Field(default=None)
    content: list[Any] | None = None

genkit.model.ModelInfo

Bases: GenkitModel

Model for modelinfo data.

Source code in packages/genkit/src/genkit/_core/_typing.py
346
347
348
349
350
351
352
353
354
class ModelInfo(GenkitModel):
    """Model for modelinfo data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    versions: list[str] | None = None
    label: str | None = None
    config_schema: ConfigSchema | None = None
    supports: Supports | None = None
    stage: Stage | None = None

genkit.model.Supports

Bases: GenkitModel

Model for supports data.

Source code in packages/genkit/src/genkit/_core/_typing.py
799
800
801
802
803
804
805
806
807
808
809
810
811
812
class Supports(GenkitModel):
    """Model for supports data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    multiturn: bool | None = None
    media: bool | None = None
    tools: bool | None = None
    system_role: bool | None = None
    output: list[str] | None = None
    content_type: list[str] | None = None
    context: bool | None = None
    constrained: Constrained | None = None
    tool_choice: bool | None = None
    long_running: bool | None = None

genkit.model.Constrained

Bases: StrEnum

Constrained generation support (none, all, no-tools).

Source code in packages/genkit/src/genkit/_core/_typing.py
906
907
908
909
910
911
class Constrained(StrEnum):
    """Constrained generation support (none, all, no-tools)."""

    NONE = 'none'
    ALL = 'all'
    NO_TOOLS = 'no-tools'

genkit.model.Stage

Bases: StrEnum

Model stage (featured, stable, unstable, legacy, deprecated).

Source code in packages/genkit/src/genkit/_core/_typing.py
914
915
916
917
918
919
920
921
class Stage(StrEnum):
    """Model stage (featured, stable, unstable, legacy, deprecated)."""

    FEATURED = 'featured'
    STABLE = 'stable'
    UNSTABLE = 'unstable'
    LEGACY = 'legacy'
    DEPRECATED = 'deprecated'

genkit.model.model_action_metadata(name, info=None, config_schema=None)

Create ActionMetadata for a model action.

Source code in packages/genkit/src/genkit/_ai/_model.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def model_action_metadata(
    name: str,
    info: dict[str, object] | None = None,
    config_schema: type | dict[str, Any] | None = None,
) -> ActionMetadata:
    """Create ActionMetadata for a model action."""
    info = info if info is not None else {}
    return ActionMetadata(
        action_type=ActionKind.MODEL,
        name=name,
        input_json_schema=to_json_schema(ModelRequest),
        output_json_schema=to_json_schema(ModelResponse),
        metadata={'model': {**info, 'customOptions': to_json_schema(config_schema) if config_schema else None}},
    )

genkit.model.model_ref(name, namespace=None, info=None, version=None, config=None)

Create a ModelRef, optionally prefixing name with namespace.

Source code in packages/genkit/src/genkit/_ai/_model.py
69
70
71
72
73
74
75
76
77
78
79
80
def model_ref(
    name: str,
    namespace: str | None = None,
    info: ModelInfo | None = None,
    version: str | None = None,
    config: dict[str, object] | None = None,
) -> ModelRef:
    """Create a ModelRef, optionally prefixing name with namespace."""
    # Logic: if (options.namespace && !name.startsWith(options.namespace + '/'))
    final_name = f'{namespace}/{name}' if namespace and not name.startswith(f'{namespace}/') else name

    return ModelRef(name=final_name, info=info, version=version, config=config)

genkit.model.ModelRef

Bases: BaseModel

Reference to a model with configuration.

Source code in packages/genkit/src/genkit/_core/_model.py
70
71
72
73
74
75
76
77
class ModelRef(BaseModel):
    """Reference to a model with configuration."""

    name: str
    config_schema: object | None = None
    info: object | None = None
    version: str | None = None
    config: dict[str, object] | None = None

genkit.model.ModelConfig = GenerationCommonConfig module-attribute

genkit.model.Message

Bases: MessageData

Message wrapper with utility properties for text and tool requests.

Source code in packages/genkit/src/genkit/_core/_model.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class Message(MessageData):
    """Message wrapper with utility properties for text and tool requests."""

    def __init__(
        self,
        message: MessageData | None = None,
        **kwargs: object,
    ) -> None:
        """Initialize from MessageData or keyword arguments."""
        if message is not None:
            if isinstance(message, dict):
                role = message.get('role')
                if role is None:
                    raise ValueError('Message role is required')
                super().__init__(
                    role=role,
                    content=message.get('content', []),
                    metadata=message.get('metadata'),
                )
            else:
                super().__init__(
                    role=message.role,
                    content=message.content,
                    metadata=message.metadata,
                )
        else:
            super().__init__(**kwargs)  # type: ignore[arg-type]

    def __eq__(self, other: object) -> bool:
        """Compare messages by role, content, and metadata."""
        if isinstance(other, MessageData):
            return self.role == other.role and self.content == other.content and self.metadata == other.metadata
        return super().__eq__(other)

    def __hash__(self) -> int:
        """Return identity-based hash."""
        return hash(id(self))

    @cached_property
    def text(self) -> str:
        """All text parts concatenated into a single string."""
        return text_from_message(self)

    @cached_property
    def tool_requests(self) -> list[ToolRequestPart]:
        """All tool request parts in this message."""
        return [p.root for p in self.content if isinstance(p.root, ToolRequestPart)]

    @cached_property
    def interrupts(self) -> list[ToolRequestPart]:
        """Tool requests marked as interrupted."""
        return [p for p in self.tool_requests if p.metadata and p.metadata.get('interrupt')]

interrupts cached property

Tool requests marked as interrupted.

text cached property

All text parts concatenated into a single string.

tool_requests cached property

All tool request parts in this message.

__eq__(other)

Compare messages by role, content, and metadata.

Source code in packages/genkit/src/genkit/_core/_model.py
108
109
110
111
112
def __eq__(self, other: object) -> bool:
    """Compare messages by role, content, and metadata."""
    if isinstance(other, MessageData):
        return self.role == other.role and self.content == other.content and self.metadata == other.metadata
    return super().__eq__(other)

__hash__()

Return identity-based hash.

Source code in packages/genkit/src/genkit/_core/_model.py
114
115
116
def __hash__(self) -> int:
    """Return identity-based hash."""
    return hash(id(self))

__init__(message=None, **kwargs)

Initialize from MessageData or keyword arguments.

Source code in packages/genkit/src/genkit/_core/_model.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def __init__(
    self,
    message: MessageData | None = None,
    **kwargs: object,
) -> None:
    """Initialize from MessageData or keyword arguments."""
    if message is not None:
        if isinstance(message, dict):
            role = message.get('role')
            if role is None:
                raise ValueError('Message role is required')
            super().__init__(
                role=role,
                content=message.get('content', []),
                metadata=message.get('metadata'),
            )
        else:
            super().__init__(
                role=message.role,
                content=message.content,
                metadata=message.metadata,
            )
    else:
        super().__init__(**kwargs)  # type: ignore[arg-type]

genkit.model.get_basic_usage_stats(input_, response)

Calculate usage stats (characters, media counts) from messages.

Source code in packages/genkit/src/genkit/_core/_model.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
def get_basic_usage_stats(input_: list[Message], response: Message) -> GenerationUsage:
    """Calculate usage stats (characters, media counts) from messages."""
    request_parts: list[Part] = []
    for msg in input_:
        request_parts.extend(msg.content)

    response_parts = response.content

    def count_parts(parts: list[Part]) -> tuple[int, int, int, int]:
        """Count characters, images, videos, audio in parts."""
        characters = 0
        images = 0
        videos = 0
        audio = 0

        for part in parts:
            text_val = part.root.text
            if text_val:
                if isinstance(text_val, Text):
                    characters += len(str(text_val.root)) if text_val.root else 0
                else:
                    characters += len(str(text_val))

            media = part.root.media
            if media:
                if isinstance(media, Media):
                    content_type = media.content_type or ''
                    url = media.url or ''
                elif isinstance(media, MediaModel) and hasattr(media.root, 'content_type'):
                    content_type = getattr(media.root, 'content_type', '') or ''
                    url = getattr(media.root, 'url', '') or ''
                else:
                    content_type = ''
                    url = ''

                if content_type.startswith('image') or url.startswith('data:image'):
                    images += 1
                elif content_type.startswith('video') or url.startswith('data:video'):
                    videos += 1
                elif content_type.startswith('audio') or url.startswith('data:audio'):
                    audio += 1

        return characters, images, videos, audio

    in_chars, in_imgs, in_vids, in_audio = count_parts(request_parts)
    out_chars, out_imgs, out_vids, out_audio = count_parts(response_parts)

    return GenerationUsage(
        input_characters=in_chars,
        input_images=in_imgs,
        input_videos=in_vids,
        input_audio_files=in_audio,
        output_characters=out_chars,
        output_images=out_imgs,
        output_videos=out_vids,
        output_audio_files=out_audio,
    )

genkit.embedder

genkit.embedder.EmbedRequest

Bases: GenkitModel

Model for embedrequest data.

Source code in packages/genkit/src/genkit/_core/_typing.py
90
91
92
93
94
95
class EmbedRequest(GenkitModel):
    """Model for embedrequest data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    input: list[DocumentData] = Field(...)
    options: Any | None = Field(default=None)

genkit.embedder.EmbedResponse

Bases: GenkitModel

Model for embedresponse data.

Source code in packages/genkit/src/genkit/_core/_typing.py
 98
 99
100
101
102
class EmbedResponse(GenkitModel):
    """Model for embedresponse data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    embeddings: list[Embedding] = Field(...)

genkit.embedder.Embedding

Bases: GenkitModel

Model for embedding data.

Source code in packages/genkit/src/genkit/_core/_typing.py
105
106
107
108
109
110
class Embedding(GenkitModel):
    """Model for embedding data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    embedding: list[float] = Field(...)
    metadata: Metadata | None = None

genkit.embedder.embedder_action_metadata(name, options=None)

Create ActionMetadata for an embedder action.

Source code in packages/genkit/src/genkit/_ai/_embedding.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def embedder_action_metadata(
    name: str,
    options: EmbedderOptions | None = None,
) -> ActionMetadata:
    """Create ActionMetadata for an embedder action."""
    options = options if options is not None else EmbedderOptions()
    embedder_metadata_dict: dict[str, object] = {'embedder': {}}
    embedder_info = cast(dict[str, object], embedder_metadata_dict['embedder'])

    if options.label:
        embedder_info['label'] = options.label

    embedder_info['dimensions'] = options.dimensions

    if options.supports:
        embedder_info['supports'] = options.supports.model_dump(exclude_none=True, by_alias=True)

    embedder_info['customOptions'] = options.config_schema if options.config_schema else None

    return ActionMetadata(
        action_type=ActionKind.EMBEDDER,
        name=name,
        input_json_schema=to_json_schema(EmbedRequest),
        output_json_schema=to_json_schema(EmbedResponse),
        metadata=embedder_metadata_dict,
    )

genkit.embedder.embedder_ref(name, config=None, version=None)

Creates an EmbedderRef instance.

Source code in packages/genkit/src/genkit/_ai/_embedding.py
114
115
116
def create_embedder_ref(name: str, config: dict[str, Any] | None = None, version: str | None = None) -> EmbedderRef:
    """Creates an EmbedderRef instance."""
    return EmbedderRef(name=name, config=config, version=version)

genkit.embedder.EmbedderRef

Bases: BaseModel

Reference to an embedder with configuration.

Source code in packages/genkit/src/genkit/_ai/_embedding.py
53
54
55
56
57
58
59
60
class EmbedderRef(BaseModel):
    """Reference to an embedder with configuration."""

    model_config: ClassVar[ConfigDict] = ConfigDict(extra='forbid', populate_by_name=True)

    name: str
    config: Any | None = None
    version: str | None = None

genkit.embedder.EmbedderSupports

Bases: BaseModel

Embedder capability support.

Source code in packages/genkit/src/genkit/_ai/_embedding.py
33
34
35
36
37
38
39
class EmbedderSupports(BaseModel):
    """Embedder capability support."""

    model_config: ClassVar[ConfigDict] = ConfigDict(extra='forbid', populate_by_name=True)

    input: list[str] | None = None
    multilingual: bool | None = None

genkit.embedder.EmbedderOptions

Bases: BaseModel

Configuration options for an embedder.

Source code in packages/genkit/src/genkit/_ai/_embedding.py
42
43
44
45
46
47
48
49
50
class EmbedderOptions(BaseModel):
    """Configuration options for an embedder."""

    model_config: ClassVar[ConfigDict] = ConfigDict(extra='forbid', populate_by_name=True, alias_generator=to_camel)

    config_schema: dict[str, Any] | None = None
    label: str | None = None
    supports: EmbedderSupports | None = None
    dimensions: int | None = None

genkit.plugin_api

genkit.plugin_api.Plugin

Bases: ABC

Abstract base class for Genkit plugins.

Source code in packages/genkit/src/genkit/_core/_plugin.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class Plugin(abc.ABC):
    """Abstract base class for Genkit plugins."""

    name: str  # plugin namespace

    @abc.abstractmethod
    async def init(self) -> list[Action]:
        """Lazy warm-up called once per plugin; return actions to pre-register."""
        ...

    @abc.abstractmethod
    async def resolve(self, action_type: ActionKind, name: str) -> Action | None:
        """Resolve a single action by kind and namespaced name."""
        ...

    @abc.abstractmethod
    async def list_actions(self) -> list[ActionMetadata]:
        """Return advertised actions for dev UI/reflection listing.

        ``ActionMetadata.action_type`` must be set (typically ``ActionKind.*``) and
        ``ActionMetadata.name`` must match resolution keys (typically
        ``{plugin.name}/localName`` for plugin-backed actions).
        """
        ...

    async def model(self, name: str) -> Action | None:
        """Resolve a model action by name (local or namespaced)."""
        target = name if '/' in name else f'{self.name}/{name}'
        return await self.resolve(ActionKind.MODEL, target)

    async def embedder(self, name: str) -> Action | None:
        """Resolve an embedder action by name (local or namespaced)."""
        target = name if '/' in name else f'{self.name}/{name}'
        return await self.resolve(ActionKind.EMBEDDER, target)

embedder(name) async

Resolve an embedder action by name (local or namespaced).

Source code in packages/genkit/src/genkit/_core/_plugin.py
55
56
57
58
async def embedder(self, name: str) -> Action | None:
    """Resolve an embedder action by name (local or namespaced)."""
    target = name if '/' in name else f'{self.name}/{name}'
    return await self.resolve(ActionKind.EMBEDDER, target)

init() abstractmethod async

Lazy warm-up called once per plugin; return actions to pre-register.

Source code in packages/genkit/src/genkit/_core/_plugin.py
30
31
32
33
@abc.abstractmethod
async def init(self) -> list[Action]:
    """Lazy warm-up called once per plugin; return actions to pre-register."""
    ...

list_actions() abstractmethod async

Return advertised actions for dev UI/reflection listing.

ActionMetadata.action_type must be set (typically ActionKind.*) and ActionMetadata.name must match resolution keys (typically {plugin.name}/localName for plugin-backed actions).

Source code in packages/genkit/src/genkit/_core/_plugin.py
40
41
42
43
44
45
46
47
48
@abc.abstractmethod
async def list_actions(self) -> list[ActionMetadata]:
    """Return advertised actions for dev UI/reflection listing.

    ``ActionMetadata.action_type`` must be set (typically ``ActionKind.*``) and
    ``ActionMetadata.name`` must match resolution keys (typically
    ``{plugin.name}/localName`` for plugin-backed actions).
    """
    ...

model(name) async

Resolve a model action by name (local or namespaced).

Source code in packages/genkit/src/genkit/_core/_plugin.py
50
51
52
53
async def model(self, name: str) -> Action | None:
    """Resolve a model action by name (local or namespaced)."""
    target = name if '/' in name else f'{self.name}/{name}'
    return await self.resolve(ActionKind.MODEL, target)

resolve(action_type, name) abstractmethod async

Resolve a single action by kind and namespaced name.

Source code in packages/genkit/src/genkit/_core/_plugin.py
35
36
37
38
@abc.abstractmethod
async def resolve(self, action_type: ActionKind, name: str) -> Action | None:
    """Resolve a single action by kind and namespaced name."""
    ...

genkit.plugin_api.Action

Bases: Generic[InputT, OutputT, ChunkT]

A named, traced, remotely callable function.

Source code in packages/genkit/src/genkit/_core/_action.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
class Action(Generic[InputT, OutputT, ChunkT]):
    """A named, traced, remotely callable function."""

    def __init__(
        self,
        kind: ActionKind,
        name: str,
        fn: Callable[..., Awaitable[OutputT]],
        metadata_fn: Callable[..., object] | None = None,
        description: str | None = None,
        metadata: dict[str, object] | None = None,
        span_metadata: dict[str, SpanAttributeValue] | None = None,
    ) -> None:
        self._kind: ActionKind = kind
        self._name: str = name
        self._metadata: dict[str, object] = metadata if metadata else {}
        self._description: str | None = description
        self._span_metadata: dict[str, SpanAttributeValue] = span_metadata or {}
        # Optional matcher function for resource actions
        self.matches: Callable[[object], bool] | None = None

        # All action handlers must be async
        if not inspect.iscoroutinefunction(fn):
            raise TypeError(f"Action handlers must be async functions. Got sync function for '{name}'.")

        input_spec = inspect.getfullargspec(metadata_fn if metadata_fn else fn)
        try:
            resolved_annotations = get_type_hints(metadata_fn if metadata_fn else fn)
        except (NameError, TypeError, AttributeError):
            resolved_annotations = input_spec.annotations
        action_args, arg_types = extract_action_args_and_types(input_spec, resolved_annotations)
        # Raw user fn; tracing/dispatch handled by _run_with_telemetry / _invoke.
        self._fn: Callable[..., Awaitable[OutputT]] = fn
        self._n_action_args: int = len(action_args)
        self._initialize_io_schemas(action_args, arg_types, resolved_annotations, input_spec)

    @property
    def kind(self) -> ActionKind:
        return self._kind

    @property
    def name(self) -> str:
        return self._name

    @property
    def description(self) -> str | None:
        return self._description

    @property
    def metadata(self) -> dict[str, object]:
        return self._metadata

    @property
    def input_type(self) -> TypeAdapter[InputT] | None:
        return self._input_type

    @property
    def input_schema(self) -> dict[str, object]:
        return self._input_schema

    @input_schema.setter
    def input_schema(self, value: dict[str, object]) -> None:
        self._input_schema = value
        self._metadata[ActionMetadataKey.INPUT_KEY] = value

    @property
    def output_schema(self) -> dict[str, object]:
        return self._output_schema

    @output_schema.setter
    def output_schema(self, value: dict[str, object]) -> None:
        self._output_schema = value
        self._metadata[ActionMetadataKey.OUTPUT_KEY] = value

    def _override_input_schema(
        self,
        input_schema: type[BaseModel] | dict[str, object],
    ) -> None:
        """Replace inferred input JSON Schema and validation type (e.g. tool schema overrides)."""
        in_js = to_json_schema(input_schema)
        self.input_schema = in_js
        if isinstance(input_schema, dict):
            self._input_type = None
        else:
            self._input_type = cast(TypeAdapter[InputT], TypeAdapter(input_schema))

    async def __call__(self, input: InputT | None = None) -> OutputT:
        """Call the action directly, returning just the response value."""
        return (await self.run(input)).response

    async def run(
        self,
        input: InputT | None = None,
        on_chunk: Callable[[ChunkT], None] | None = None,
        context: dict[str, object] | None = None,
        on_trace_start: Callable[[str, str], Awaitable[None]] | None = None,
        telemetry_labels: dict[str, object] | None = None,
    ) -> ActionResponse[OutputT]:
        """Execute the action with optional input validation.

        Args:
            input: The input to the action. Will be validated against the input schema.
            on_chunk: Optional streaming callback for chunked responses.
            context: Optional context dict for the action.
            on_trace_start: Optional callback invoked when trace starts.
            telemetry_labels: Custom labels to set as direct span attributes.

        Returns:
            ActionResponse containing the result and trace metadata.

        Raises:
            GenkitError: If input validation fails (INVALID_ARGUMENT status).
        """
        # Validate input if we have a schema
        if self._input_type is not None:
            try:
                input = self._input_type.validate_python(input)
            except ValidationError as e:
                if input is None:
                    raise GenkitError(
                        message=(
                            f"Action '{self.name}' requires input but none was provided. "
                            'Please supply a valid input payload.'
                        ),
                        status='INVALID_ARGUMENT',
                    ) from e
                raise GenkitError(
                    message=f"Invalid input for action '{self.name}': {e}",
                    status='INVALID_ARGUMENT',
                    cause=e,
                ) from e

        if context:
            _ = _action_context.set(context)

        streaming_cb = cast(StreamingCallback, on_chunk) if on_chunk else None

        return await self._run_with_telemetry(
            input,
            ActionRunContext(
                context=_action_context.get(None),
                streaming_callback=streaming_cb,
            ),
            on_trace_start,
            telemetry_labels,
        )

    def stream(
        self,
        input: InputT | None = None,
        context: dict[str, object] | None = None,
        telemetry_labels: dict[str, object] | None = None,
        timeout: float | None = None,
    ) -> StreamResponse[ChunkT, OutputT]:
        """Execute and return a StreamResponse with .stream and .response properties."""
        channel: Channel[ChunkT, ActionResponse[OutputT]] = Channel(timeout=timeout)

        def send_chunk(c: ChunkT) -> None:
            channel.send(c)

        resp = self.run(
            input=input,
            context=context,
            telemetry_labels=telemetry_labels,
            on_chunk=send_chunk,
        )
        channel.set_close_future(asyncio.create_task(resp))

        result_future: asyncio.Future[OutputT] = asyncio.Future()
        channel.closed.add_done_callback(lambda _: result_future.set_result(channel.closed.result().response))

        return StreamResponse(stream=channel, response=result_future)

    def _initialize_io_schemas(
        self,
        action_args: list[str],
        arg_types: list[type],
        annotations: dict[str, Any],
        _input_spec: inspect.FullArgSpec,
    ) -> None:
        # Allow up to 2 args: (input, ctx) - use ctx.send_chunk() for streaming
        if len(action_args) > 2:
            raise TypeError(f'can only have up to 2 args: {action_args}')

        if len(action_args) > 0:
            type_adapter = TypeAdapter(arg_types[0])
            self._input_schema: dict[str, object] = type_adapter.json_schema()
            self._input_type: TypeAdapter[InputT] | None = cast(TypeAdapter[InputT], type_adapter)
            self._metadata[ActionMetadataKey.INPUT_KEY] = self._input_schema
        else:
            self._input_schema = TypeAdapter(object).json_schema()
            self._input_type = None
            self._metadata[ActionMetadataKey.INPUT_KEY] = self._input_schema

        if ActionMetadataKey.RETURN in annotations:
            type_adapter = TypeAdapter(annotations[ActionMetadataKey.RETURN])
            self._output_schema: dict[str, object] = type_adapter.json_schema()
            self._metadata[ActionMetadataKey.OUTPUT_KEY] = self._output_schema
        else:
            self._output_schema = TypeAdapter(object).json_schema()
            self._metadata[ActionMetadataKey.OUTPUT_KEY] = self._output_schema

    async def _run_with_telemetry(
        self,
        input: object | None,
        ctx: ActionRunContext,
        on_trace_start: Callable[[str, str], Awaitable[None]] | None,
        telemetry_labels: dict[str, object] | None,
    ) -> ActionResponse[OutputT]:
        """Open the action span via ``run_in_new_span``, dispatch ``self._fn``, wrap errors in ``GenkitError``."""
        start_time = time.perf_counter()
        suppress = str((telemetry_labels or {}).get('genkitx:ignore-trace', '')).lower() == 'true'
        suppress_token = suppress_telemetry.set(True) if suppress else None

        # ``type``/``subtype`` set canonical genkit:type / genkit:metadata:subtype attrs.
        # ``self._span_metadata`` uses short keys; run_in_new_span auto-prefixes them with
        # ``genkit:metadata:``. ``telemetry_labels`` are caller-controlled passthrough attrs.
        extra_metadata: dict[str, str] = {k: str(v) for k, v in self._span_metadata.items()}
        # Surface action context (auth, headers, etc.) on the span so the Dev UI
        # trace inspector can render the "Context" panel for a flow run.
        if ctx.context:
            try:
                extra_metadata['context'] = json.dumps(ctx.context)
            except Exception:
                try:
                    cleaned_context = _sanitize_value(ctx.context)
                    extra_metadata['context'] = json.dumps(cleaned_context)
                except Exception:
                    extra_metadata['context'] = str(ctx.context)
        span_meta = SpanMetadata(
            name=self._name,
            type='action',
            subtype=str(self._kind),
            input=input,
            metadata=extra_metadata or None,
            telemetry_labels={k: str(v) for k, v in (telemetry_labels or {}).items()} or None,
        )

        trace_id = ''
        try:
            with run_in_new_span(span_meta) as span:
                # OpenTelemetry standard hex format.
                trace_id = format(span.get_span_context().trace_id, '032x')
                span_id = format(span.get_span_context().span_id, '016x')
                if on_trace_start:
                    await on_trace_start(trace_id, span_id)

                output = await self._invoke(input, ctx)
                output = cast(OutputT, _record_latency(output, start_time))
                # Picked up by run_in_new_span's success branch and written as ``genkit:output``.
                span_meta.output = output
                return ActionResponse(response=output, trace_id=trace_id, span_id=span_id)
        except GenkitError:
            raise
        except Exception as e:
            # Wrap outside the with-block so we don't clobber ``genkit:error`` (which
            # ``run_in_new_span`` already set to ``str(original_e)``).
            raise GenkitError(
                cause=e,
                message=f'Error while running action {self._name}',
                trace_id=trace_id,
            ) from e
        finally:
            if suppress_token is not None:
                suppress_telemetry.reset(suppress_token)

    async def _invoke(self, input: object | None, ctx: ActionRunContext) -> OutputT:
        """Dispatch ``self._fn`` based on its declared arity (0/1/2 args)."""
        match self._n_action_args:
            case 0:
                return await self._fn()
            case 1:
                return await self._fn(input)
            case 2:
                return await self._fn(input, ctx)
            case _:
                raise ValueError('action fn must have 0-2 args')

__call__(input=None) async

Call the action directly, returning just the response value.

Source code in packages/genkit/src/genkit/_core/_action.py
439
440
441
async def __call__(self, input: InputT | None = None) -> OutputT:
    """Call the action directly, returning just the response value."""
    return (await self.run(input)).response

run(input=None, on_chunk=None, context=None, on_trace_start=None, telemetry_labels=None) async

Execute the action with optional input validation.

Parameters:

Name Type Description Default
input InputT | None

The input to the action. Will be validated against the input schema.

None
on_chunk Callable[[ChunkT], None] | None

Optional streaming callback for chunked responses.

None
context dict[str, object] | None

Optional context dict for the action.

None
on_trace_start Callable[[str, str], Awaitable[None]] | None

Optional callback invoked when trace starts.

None
telemetry_labels dict[str, object] | None

Custom labels to set as direct span attributes.

None

Returns:

Type Description
ActionResponse[OutputT]

ActionResponse containing the result and trace metadata.

Raises:

Type Description
GenkitError

If input validation fails (INVALID_ARGUMENT status).

Source code in packages/genkit/src/genkit/_core/_action.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
async def run(
    self,
    input: InputT | None = None,
    on_chunk: Callable[[ChunkT], None] | None = None,
    context: dict[str, object] | None = None,
    on_trace_start: Callable[[str, str], Awaitable[None]] | None = None,
    telemetry_labels: dict[str, object] | None = None,
) -> ActionResponse[OutputT]:
    """Execute the action with optional input validation.

    Args:
        input: The input to the action. Will be validated against the input schema.
        on_chunk: Optional streaming callback for chunked responses.
        context: Optional context dict for the action.
        on_trace_start: Optional callback invoked when trace starts.
        telemetry_labels: Custom labels to set as direct span attributes.

    Returns:
        ActionResponse containing the result and trace metadata.

    Raises:
        GenkitError: If input validation fails (INVALID_ARGUMENT status).
    """
    # Validate input if we have a schema
    if self._input_type is not None:
        try:
            input = self._input_type.validate_python(input)
        except ValidationError as e:
            if input is None:
                raise GenkitError(
                    message=(
                        f"Action '{self.name}' requires input but none was provided. "
                        'Please supply a valid input payload.'
                    ),
                    status='INVALID_ARGUMENT',
                ) from e
            raise GenkitError(
                message=f"Invalid input for action '{self.name}': {e}",
                status='INVALID_ARGUMENT',
                cause=e,
            ) from e

    if context:
        _ = _action_context.set(context)

    streaming_cb = cast(StreamingCallback, on_chunk) if on_chunk else None

    return await self._run_with_telemetry(
        input,
        ActionRunContext(
            context=_action_context.get(None),
            streaming_callback=streaming_cb,
        ),
        on_trace_start,
        telemetry_labels,
    )

stream(input=None, context=None, telemetry_labels=None, timeout=None)

Execute and return a StreamResponse with .stream and .response properties.

Source code in packages/genkit/src/genkit/_core/_action.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
def stream(
    self,
    input: InputT | None = None,
    context: dict[str, object] | None = None,
    telemetry_labels: dict[str, object] | None = None,
    timeout: float | None = None,
) -> StreamResponse[ChunkT, OutputT]:
    """Execute and return a StreamResponse with .stream and .response properties."""
    channel: Channel[ChunkT, ActionResponse[OutputT]] = Channel(timeout=timeout)

    def send_chunk(c: ChunkT) -> None:
        channel.send(c)

    resp = self.run(
        input=input,
        context=context,
        telemetry_labels=telemetry_labels,
        on_chunk=send_chunk,
    )
    channel.set_close_future(asyncio.create_task(resp))

    result_future: asyncio.Future[OutputT] = asyncio.Future()
    channel.closed.add_done_callback(lambda _: result_future.set_result(channel.closed.result().response))

    return StreamResponse(stream=channel, response=result_future)

genkit.plugin_api.ActionMetadata

Bases: GenkitModel

Model for actionmetadata data.

Source code in packages/genkit/src/genkit/_core/_typing.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
class ActionMetadata(GenkitModel):
    """Model for actionmetadata data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    key: str | None = None
    action_type: str | None = None
    name: str = Field(...)
    description: str | None = None
    input_schema: Any | None = Field(default=None)
    input_json_schema: Any | dict[str, Any] | None = Field(
        default=None, description='A JSON Schema Draft 7 (http://json-schema.org/draft-07/schema) object.'
    )
    output_schema: Any | None = Field(default=None)
    output_json_schema: Any | None = Field(
        default=None, description='A JSON Schema Draft 7 (http://json-schema.org/draft-07/schema) object.'
    )
    stream_schema: Any | None = Field(default=None)
    metadata: Metadata | None = None

genkit.plugin_api.ActionKind

Bases: StrEnum

Types of actions that can be registered.

Source code in packages/genkit/src/genkit/_core/_action.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class ActionKind(StrEnum):
    """Types of actions that can be registered."""

    BACKGROUND_MODEL = 'background-model'
    CANCEL_OPERATION = 'cancel-operation'
    CHECK_OPERATION = 'check-operation'
    CUSTOM = 'custom'
    DYNAMIC_ACTION_PROVIDER = 'dynamic-action-provider'
    EMBEDDER = 'embedder'
    EVALUATOR = 'evaluator'
    EXECUTABLE_PROMPT = 'executable-prompt'
    FLOW = 'flow'
    INDEXER = 'indexer'
    MODEL = 'model'
    PROMPT = 'prompt'
    RERANKER = 'reranker'
    RESOURCE = 'resource'
    RETRIEVER = 'retriever'
    TOOL = 'tool'
    UTIL = 'util'

genkit.plugin_api.ActionRunContext

Execution context for an action.

Provides read-only access to action context (auth, metadata) and streaming support.

Source code in packages/genkit/src/genkit/_core/_action.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
class ActionRunContext:
    """Execution context for an action.

    Provides read-only access to action context (auth, metadata) and streaming support.
    """

    def __init__(
        self,
        context: dict[str, object] | None = None,
        streaming_callback: StreamingCallback | None = None,
    ) -> None:
        self._context: dict[str, object] = context if context is not None else {}
        self._streaming_callback = streaming_callback

    @property
    def context(self) -> dict[str, object]:
        return self._context

    @property
    def is_streaming(self) -> bool:
        """Returns True if a streaming callback is registered."""
        return self._streaming_callback is not None

    @property
    def streaming_callback(self) -> StreamingCallback | None:
        """Returns the streaming callback, if any.

        Use this when you need to pass the callback to another action.
        For sending chunks directly, use send_chunk() instead.
        """
        return self._streaming_callback

    def send_chunk(self, chunk: object) -> None:
        """Send a streaming chunk to the client.

        Args:
            chunk: The chunk data to stream.
        """
        if self._streaming_callback is not None:
            self._streaming_callback(chunk)

    @staticmethod
    def _current_context() -> dict[str, object] | None:
        return _action_context.get(None)

is_streaming property

Returns True if a streaming callback is registered.

streaming_callback property

Returns the streaming callback, if any.

Use this when you need to pass the callback to another action. For sending chunks directly, use send_chunk() instead.

send_chunk(chunk)

Send a streaming chunk to the client.

Parameters:

Name Type Description Default
chunk object

The chunk data to stream.

required
Source code in packages/genkit/src/genkit/_core/_action.py
339
340
341
342
343
344
345
346
def send_chunk(self, chunk: object) -> None:
    """Send a streaming chunk to the client.

    Args:
        chunk: The chunk data to stream.
    """
    if self._streaming_callback is not None:
        self._streaming_callback(chunk)

genkit.plugin_api.StatusCodes

Bases: IntEnum

gRPC-style status codes. See _STATUS_CODE_MAP for HTTP mappings.

Source code in packages/genkit/src/genkit/_core/_error.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class StatusCodes(IntEnum):
    """gRPC-style status codes. See _STATUS_CODE_MAP for HTTP mappings."""

    OK = 0
    CANCELLED = 1
    UNKNOWN = 2
    INVALID_ARGUMENT = 3
    DEADLINE_EXCEEDED = 4
    NOT_FOUND = 5
    ALREADY_EXISTS = 6
    PERMISSION_DENIED = 7
    RESOURCE_EXHAUSTED = 8
    FAILED_PRECONDITION = 9
    ABORTED = 10
    OUT_OF_RANGE = 11
    UNIMPLEMENTED = 12
    INTERNAL = 13
    UNAVAILABLE = 14
    DATA_LOSS = 15
    UNAUTHENTICATED = 16

genkit.plugin_api.StatusName = Literal['OK', 'CANCELLED', 'UNKNOWN', 'INVALID_ARGUMENT', 'DEADLINE_EXCEEDED', 'NOT_FOUND', 'ALREADY_EXISTS', 'PERMISSION_DENIED', 'UNAUTHENTICATED', 'RESOURCE_EXHAUSTED', 'FAILED_PRECONDITION', 'ABORTED', 'OUT_OF_RANGE', 'UNIMPLEMENTED', 'INTERNAL', 'UNAVAILABLE', 'DATA_LOSS'] module-attribute

genkit.plugin_api.GenkitError

Bases: Exception

Base error class for Genkit errors.

Source code in packages/genkit/src/genkit/_core/_error.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class GenkitError(Exception):
    """Base error class for Genkit errors."""

    def __init__(
        self,
        *,
        message: str,
        status: StatusName | None = None,
        cause: Exception | None = None,
        details: Any = None,  # noqa: ANN401
        trace_id: str | None = None,
        source: str | None = None,
    ) -> None:
        """Initialize a GenkitError.

        Args:
            message: The error message.
            status: The status name for this error.
            cause: The underlying exception that caused this error.
            details: Optional detail information.
            trace_id: A unique identifier for tracing the action execution.
            source: Optional source of the error.
        """
        temp_status: StatusName
        if status:
            temp_status = status
        elif isinstance(cause, GenkitError):
            temp_status = cause.status
        else:
            temp_status = 'INTERNAL'
        self.status: StatusName = temp_status
        self.http_code: int = http_status_code(temp_status)

        source_prefix = f'{source}: ' if source else ''
        super().__init__(f'{source_prefix}{self.status}: {message}')
        self.original_message: str = message

        if not details:
            details = {}
        if 'stack' not in details:
            details['stack'] = get_error_stack(cause if cause else self)
        if 'trace_id' not in details and trace_id:
            details['trace_id'] = trace_id

        self.details: Any = details
        self.source: str | None = source
        self.trace_id: str | None = trace_id
        self.cause: Exception | None = cause

    def to_callable_serializable(self) -> HttpErrorWireFormat:
        """Returns a JSON-serializable representation of this object.

        Returns:
            An HttpErrorWireFormat model instance.
        """
        # This error type is used by 3P authors with the field "details",
        # but the actual Callable protocol value is "details"
        return HttpErrorWireFormat(
            details=self.details,
            status=StatusCodes[self.status].name,
            message=repr(self.cause) if self.cause else self.original_message,
        )

    def to_serializable(self) -> ReflectionError:
        """Returns a JSON-serializable representation of this object.

        Returns:
            A ReflectionError model instance.
        """
        return ReflectionError(
            details=ReflectionErrorDetails(**self.details) if self.details else None,
            code=StatusCodes[self.status].value,
            message=f'{self.original_message}: {repr(self.cause)}' if self.cause else self.original_message,
        )

__init__(*, message, status=None, cause=None, details=None, trace_id=None, source=None)

Initialize a GenkitError.

Parameters:

Name Type Description Default
message str

The error message.

required
status StatusName | None

The status name for this error.

None
cause Exception | None

The underlying exception that caused this error.

None
details Any

Optional detail information.

None
trace_id str | None

A unique identifier for tracing the action execution.

None
source str | None

Optional source of the error.

None
Source code in packages/genkit/src/genkit/_core/_error.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def __init__(
    self,
    *,
    message: str,
    status: StatusName | None = None,
    cause: Exception | None = None,
    details: Any = None,  # noqa: ANN401
    trace_id: str | None = None,
    source: str | None = None,
) -> None:
    """Initialize a GenkitError.

    Args:
        message: The error message.
        status: The status name for this error.
        cause: The underlying exception that caused this error.
        details: Optional detail information.
        trace_id: A unique identifier for tracing the action execution.
        source: Optional source of the error.
    """
    temp_status: StatusName
    if status:
        temp_status = status
    elif isinstance(cause, GenkitError):
        temp_status = cause.status
    else:
        temp_status = 'INTERNAL'
    self.status: StatusName = temp_status
    self.http_code: int = http_status_code(temp_status)

    source_prefix = f'{source}: ' if source else ''
    super().__init__(f'{source_prefix}{self.status}: {message}')
    self.original_message: str = message

    if not details:
        details = {}
    if 'stack' not in details:
        details['stack'] = get_error_stack(cause if cause else self)
    if 'trace_id' not in details and trace_id:
        details['trace_id'] = trace_id

    self.details: Any = details
    self.source: str | None = source
    self.trace_id: str | None = trace_id
    self.cause: Exception | None = cause

to_callable_serializable()

Returns a JSON-serializable representation of this object.

Returns:

Type Description
HttpErrorWireFormat

An HttpErrorWireFormat model instance.

Source code in packages/genkit/src/genkit/_core/_error.py
214
215
216
217
218
219
220
221
222
223
224
225
226
def to_callable_serializable(self) -> HttpErrorWireFormat:
    """Returns a JSON-serializable representation of this object.

    Returns:
        An HttpErrorWireFormat model instance.
    """
    # This error type is used by 3P authors with the field "details",
    # but the actual Callable protocol value is "details"
    return HttpErrorWireFormat(
        details=self.details,
        status=StatusCodes[self.status].name,
        message=repr(self.cause) if self.cause else self.original_message,
    )

to_serializable()

Returns a JSON-serializable representation of this object.

Returns:

Type Description
ReflectionError

A ReflectionError model instance.

Source code in packages/genkit/src/genkit/_core/_error.py
228
229
230
231
232
233
234
235
236
237
238
def to_serializable(self) -> ReflectionError:
    """Returns a JSON-serializable representation of this object.

    Returns:
        A ReflectionError model instance.
    """
    return ReflectionError(
        details=ReflectionErrorDetails(**self.details) if self.details else None,
        code=StatusCodes[self.status].value,
        message=f'{self.original_message}: {repr(self.cause)}' if self.cause else self.original_message,
    )

genkit.plugin_api.GENKIT_CLIENT_HEADER = f'genkit-python/{GENKIT_VERSION}' module-attribute

genkit.plugin_api.GENKIT_VERSION = '0.3.2' module-attribute

genkit.plugin_api.loop_local_client(factory)

Return a getter that caches one resource instance per event loop.

Source code in packages/genkit/src/genkit/_core/_loop_cache.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def _loop_local_client(factory: Callable[[], T]) -> Callable[[], T]:
    """Return a getter that caches one resource instance per event loop."""
    by_loop: weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, T] = weakref.WeakKeyDictionary()
    lock = threading.Lock()

    def _get() -> T:
        loop = asyncio.get_running_loop()
        with lock:
            existing = by_loop.get(loop)
            if existing is not None:
                return existing
            created = factory()
            by_loop[loop] = created
            return created

    return _get

genkit.plugin_api.tracer = trace_api.get_tracer('genkit-tracer', 'v1') module-attribute

genkit.plugin_api.add_custom_exporter(exporter, name='last')

Adds custom span exporter to current tracer provider.

Parameters:

Name Type Description Default
exporter SpanExporter | None

Custom or dedicated span exporter.

required
name str

Name of the span exporter. Only for logging purposes.

'last'
Source code in packages/genkit/src/genkit/_core/_tracing.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def add_custom_exporter(exporter: SpanExporter | None, name: str = 'last') -> None:
    """Adds custom span exporter to current tracer provider.

    Args:
        exporter: Custom or dedicated span exporter.
        name: Name of the span exporter. Only for logging purposes.
    """
    current_provider = init_provider()

    try:
        if exporter is None:
            logger.warn(f'{name} exporter is None')
            return

        processor = create_span_processor(exporter)
        current_provider.add_span_processor(processor)
        logger.debug(f'{name} exporter added successfully.')
    except Exception:
        logger.error(f'tracing.add_custom_exporter: failed to add exporter {name}')
        logger.exception('Failed to add custom exporter')

genkit.plugin_api.AdjustingTraceExporter

Bases: SpanExporter

Wraps a SpanExporter to redact PII and enhance spans for cloud plugins (GCP, AWS).

Source code in packages/genkit/src/genkit/_core/_trace/_adjusting_exporter.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class AdjustingTraceExporter(SpanExporter):
    """Wraps a SpanExporter to redact PII and enhance spans for cloud plugins (GCP, AWS)."""

    REDACTED: ClassVar[str] = '<redacted>'

    def __init__(
        self,
        exporter: SpanExporter,
        log_input_and_output: bool = False,
        project_id: str | None = None,
        error_handler: Callable[[Exception], None] | None = None,
    ) -> None:
        self._exporter = exporter
        self._log_input_and_output = log_input_and_output
        self._project_id = project_id
        self._error_handler = error_handler

    @property
    def project_id(self) -> str | None:
        return self._project_id

    @property
    def log_input_and_output(self) -> bool:
        return self._log_input_and_output

    @override
    def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
        adjusted = [self._adjust(span) for span in spans]
        try:
            return self._exporter.export(adjusted)
        except Exception as e:
            if self._error_handler:
                self._error_handler(e)
            raise

    @override
    def shutdown(self) -> None:
        self._exporter.shutdown()

    @override
    def force_flush(self, timeout_millis: int = 30000) -> bool:
        if hasattr(self._exporter, 'force_flush'):
            return self._exporter.force_flush(timeout_millis)
        return True

    def _adjust(self, span: ReadableSpan) -> ReadableSpan:
        """Apply all adjustments to a span."""
        span = self._redact_pii(span)
        span = self._mark_error(span)
        span = self._mark_failure_source(span)
        span = self._mark_feature(span)
        span = self._mark_model(span)
        span = self._normalize_labels(span)
        return span

    def _redact_pii(self, span: ReadableSpan) -> ReadableSpan:
        if self._log_input_and_output:
            return span
        attrs = _copy_attrs(span)
        keys_to_redact = [k for k in ('genkit:input', 'genkit:output') if k in attrs]
        if not keys_to_redact:
            return span
        for key in keys_to_redact:
            attrs[key] = self.REDACTED
        return RedactedSpan(span, attrs)

    def _mark_error(self, span: ReadableSpan) -> ReadableSpan:
        if not span.status or span.status.status_code != StatusCode.ERROR:
            return span
        attrs = _copy_attrs(span)
        attrs['/http/status_code'] = '599'
        return RedactedSpan(span, attrs)

    def _mark_failure_source(self, span: ReadableSpan) -> ReadableSpan:
        attrs = _copy_attrs(span)
        if not attrs.get('genkit:isFailureSource'):
            return span
        attrs['genkit:failedSpan'] = attrs.get('genkit:name', '')
        attrs['genkit:failedPath'] = attrs.get('genkit:path', '')
        return RedactedSpan(span, attrs)

    def _mark_feature(self, span: ReadableSpan) -> ReadableSpan:
        attrs = _copy_attrs(span)
        if not attrs.get('genkit:isRoot') or not attrs.get('genkit:name'):
            return span
        attrs['genkit:feature'] = attrs['genkit:name']
        return RedactedSpan(span, attrs)

    def _mark_model(self, span: ReadableSpan) -> ReadableSpan:
        attrs = _copy_attrs(span)
        if attrs.get('genkit:metadata:subtype') != 'model' or not attrs.get('genkit:name'):
            return span
        attrs['genkit:model'] = attrs['genkit:name']
        return RedactedSpan(span, attrs)

    def _normalize_labels(self, span: ReadableSpan) -> ReadableSpan:
        attrs = _copy_attrs(span)
        normalized = {k.replace(':', '/'): v for k, v in attrs.items()}
        return RedactedSpan(span, normalized)

genkit.plugin_api.RedactedSpan

Bases: ReadableSpan

A span wrapper that overrides attributes while delegating everything else.

Source code in packages/genkit/src/genkit/_core/_trace/_adjusting_exporter.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class RedactedSpan(ReadableSpan):
    """A span wrapper that overrides attributes while delegating everything else."""

    # pyrefly:ignore[bad-override]
    _attributes: dict[str, Any]

    def __init__(self, span: ReadableSpan, attributes: dict[str, Any]) -> None:
        self._span = span
        self._attributes = attributes

    def __getattr__(self, name: str) -> Any:  # noqa: ANN401
        return getattr(self._span, name)

    @property
    def attributes(self) -> dict[str, Any]:
        """Return the modified attributes."""
        # pyrefly: ignore[bad-return] - dict[str, Any] is compatible with Mapping at runtime
        return self._attributes

attributes property

Return the modified attributes.

genkit.plugin_api.to_display_path(qualified_path)

Convert /{a,t:flow}/{b,t:step} to 'a > b'.

Source code in packages/genkit/src/genkit/_core/_trace/_path.py
61
62
63
64
65
def to_display_path(qualified_path: str) -> str:
    """Convert /{a,t:flow}/{b,t:step} to 'a > b'."""
    if not qualified_path:
        return ''
    return ' > '.join(_PATH_SEGMENT_RE.findall(qualified_path))

genkit.plugin_api.to_json_schema(schema)

Convert a Python type to JSON schema. Pass-through if already a dict.

Source code in packages/genkit/src/genkit/_core/_schema.py
24
25
26
27
28
29
30
31
def to_json_schema(schema: type | dict[str, Any] | str | None) -> dict[str, Any]:
    """Convert a Python type to JSON schema. Pass-through if already a dict."""
    if schema is None:
        return {'type': 'null'}
    if isinstance(schema, dict):
        return schema
    type_adapter = TypeAdapter(schema)
    return type_adapter.json_schema()

genkit.plugin_api.get_cached_client(cache_key, headers=None, timeout=None, **httpx_kwargs)

Get or create a cached httpx.AsyncClient for the current event loop.

Source code in packages/genkit/src/genkit/_core/_http_client.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def get_cached_client(
    cache_key: str,
    headers: dict[str, str] | None = None,
    timeout: httpx.Timeout | float | None = None,
    **httpx_kwargs: Any,
) -> httpx.AsyncClient:
    """Get or create a cached httpx.AsyncClient for the current event loop."""
    d = _get_store()
    if cache_key not in d or d[cache_key].is_closed:
        if timeout is None:
            timeout = httpx.Timeout(60.0, connect=10.0)
        elif isinstance(timeout, (int, float)):
            timeout = httpx.Timeout(float(timeout))
        d[cache_key] = httpx.AsyncClient(headers=headers or {}, timeout=timeout, **httpx_kwargs)
    return d[cache_key]

genkit.plugin_api.get_callable_json(error)

Get the JSON-serializable representation of an error for callable responses.

Parameters:

Name Type Description Default
error object

The error to convert to JSON.

required

Returns:

Type Description
dict[str, Any]

A dict ready for json.dumps (message, status, details keys).

Source code in packages/genkit/src/genkit/_core/_error.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def get_callable_json(error: object) -> dict[str, Any]:
    """Get the JSON-serializable representation of an error for callable responses.

    Args:
        error: The error to convert to JSON.

    Returns:
        A dict ready for json.dumps (message, status, details keys).
    """
    if isinstance(error, GenkitError):
        wire = error.to_callable_serializable()
    else:
        wire = HttpErrorWireFormat(
            message=str(error),
            status=StatusCodes.INTERNAL.name,
            details={'stack': get_error_stack(error)},
        )
    return wire.model_dump()

genkit.plugin_api.is_dev_environment()

Check if running in development mode (GENKIT_ENV=dev).

Source code in packages/genkit/src/genkit/_core/_environment.py
34
35
36
def is_dev_environment() -> bool:
    """Check if running in development mode (GENKIT_ENV=dev)."""
    return os.getenv(GENKIT_ENV) == GenkitEnvironment.DEV

genkit.plugin_api.model_action_metadata(name, info=None, config_schema=None)

Create ActionMetadata for a model action.

Source code in packages/genkit/src/genkit/_ai/_model.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def model_action_metadata(
    name: str,
    info: dict[str, object] | None = None,
    config_schema: type | dict[str, Any] | None = None,
) -> ActionMetadata:
    """Create ActionMetadata for a model action."""
    info = info if info is not None else {}
    return ActionMetadata(
        action_type=ActionKind.MODEL,
        name=name,
        input_json_schema=to_json_schema(ModelRequest),
        output_json_schema=to_json_schema(ModelResponse),
        metadata={'model': {**info, 'customOptions': to_json_schema(config_schema) if config_schema else None}},
    )

genkit.plugin_api.model_ref(name, namespace=None, info=None, version=None, config=None)

Create a ModelRef, optionally prefixing name with namespace.

Source code in packages/genkit/src/genkit/_ai/_model.py
69
70
71
72
73
74
75
76
77
78
79
80
def model_ref(
    name: str,
    namespace: str | None = None,
    info: ModelInfo | None = None,
    version: str | None = None,
    config: dict[str, object] | None = None,
) -> ModelRef:
    """Create a ModelRef, optionally prefixing name with namespace."""
    # Logic: if (options.namespace && !name.startsWith(options.namespace + '/'))
    final_name = f'{namespace}/{name}' if namespace and not name.startswith(f'{namespace}/') else name

    return ModelRef(name=final_name, info=info, version=version, config=config)

genkit.plugin_api.ModelRef

Bases: BaseModel

Reference to a model with configuration.

Source code in packages/genkit/src/genkit/_core/_model.py
70
71
72
73
74
75
76
77
class ModelRef(BaseModel):
    """Reference to a model with configuration."""

    name: str
    config_schema: object | None = None
    info: object | None = None
    version: str | None = None
    config: dict[str, object] | None = None

genkit.plugin_api.embedder_action_metadata(name, options=None)

Create ActionMetadata for an embedder action.

Source code in packages/genkit/src/genkit/_ai/_embedding.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def embedder_action_metadata(
    name: str,
    options: EmbedderOptions | None = None,
) -> ActionMetadata:
    """Create ActionMetadata for an embedder action."""
    options = options if options is not None else EmbedderOptions()
    embedder_metadata_dict: dict[str, object] = {'embedder': {}}
    embedder_info = cast(dict[str, object], embedder_metadata_dict['embedder'])

    if options.label:
        embedder_info['label'] = options.label

    embedder_info['dimensions'] = options.dimensions

    if options.supports:
        embedder_info['supports'] = options.supports.model_dump(exclude_none=True, by_alias=True)

    embedder_info['customOptions'] = options.config_schema if options.config_schema else None

    return ActionMetadata(
        action_type=ActionKind.EMBEDDER,
        name=name,
        input_json_schema=to_json_schema(EmbedRequest),
        output_json_schema=to_json_schema(EmbedResponse),
        metadata=embedder_metadata_dict,
    )

genkit.plugin_api.embedder_ref(name, config=None, version=None)

Creates an EmbedderRef instance.

Source code in packages/genkit/src/genkit/_ai/_embedding.py
114
115
116
def create_embedder_ref(name: str, config: dict[str, Any] | None = None, version: str | None = None) -> EmbedderRef:
    """Creates an EmbedderRef instance."""
    return EmbedderRef(name=name, config=config, version=version)

genkit.plugin_api.EmbedderRef

Bases: BaseModel

Reference to an embedder with configuration.

Source code in packages/genkit/src/genkit/_ai/_embedding.py
53
54
55
56
57
58
59
60
class EmbedderRef(BaseModel):
    """Reference to an embedder with configuration."""

    model_config: ClassVar[ConfigDict] = ConfigDict(extra='forbid', populate_by_name=True)

    name: str
    config: Any | None = None
    version: str | None = None

genkit.plugin_api.evaluator_action_metadata(name, config_schema=None)

Create ActionMetadata for an evaluator action.

Source code in packages/genkit/src/genkit/_ai/_evaluator.py
73
74
75
76
77
78
79
80
81
82
83
84
def evaluator_action_metadata(
    name: str,
    config_schema: type | dict[str, Any] | None = None,
) -> ActionMetadata:
    """Create ActionMetadata for an evaluator action."""
    return ActionMetadata(
        action_type=ActionKind.EVALUATOR,
        name=name,
        input_json_schema=to_json_schema(EvalRequest),
        output_json_schema=to_json_schema(list[EvalFnResponse]),
        metadata={'evaluator': {'customOptions': to_json_schema(config_schema) if config_schema else None}},
    )

genkit.plugin_api.evaluator_ref(name, config_schema=None)

Create an EvaluatorRef.

Source code in packages/genkit/src/genkit/_ai/_evaluator.py
68
69
70
def evaluator_ref(name: str, config_schema: dict[str, object] | None = None) -> EvaluatorRef:
    """Create an EvaluatorRef."""
    return EvaluatorRef(name=name, config_schema=config_schema)

genkit.plugin_api.EvaluatorRef

Bases: BaseModel

Reference to an evaluator.

Source code in packages/genkit/src/genkit/_ai/_evaluator.py
59
60
61
62
63
64
65
class EvaluatorRef(BaseModel):
    """Reference to an evaluator."""

    model_config: ClassVar[ConfigDict] = ConfigDict(extra='forbid', populate_by_name=True, alias_generator=to_camel)

    name: str
    config_schema: dict[str, object] | None = None

genkit.plugin_api.ContextProvider = Callable[[RequestData[T]], dict[str, Any] | Awaitable[dict[str, Any]]] module-attribute

Middleware can read request data and add information to the context that will be passed to the Action. If middleware throws an error, that error will fail the request and the Action will not be called.

Expected cases should return a PublicError, which allows the request handler to know what data is safe to return to end users. Middleware can provide validation in addition to parsing. For example, an auth middleware can have policies for validating auth in addition to passing auth context to the Action.

genkit.plugin_api.RequestData dataclass

Bases: Generic[T]

A universal type that request handling extensions.

For example, Flask can map their request to this type. This allows ContextProviders to build consistent interfaces on any web framework.

Source code in packages/genkit/src/genkit/_core/_context.py
36
37
38
39
40
41
42
43
44
45
@dataclass
class RequestData(Generic[T]):
    """A universal type that request handling extensions.

    For example, Flask can map their request to this type.  This allows
    ContextProviders to build consistent interfaces on any web framework.
    """

    request: T
    metadata: ContextMetadata | None = None

genkit.evaluator

genkit.evaluator.EvalRequest

Bases: GenkitModel

Model for evalrequest data.

Source code in packages/genkit/src/genkit/_core/_typing.py
148
149
150
151
152
153
154
class EvalRequest(GenkitModel):
    """Model for evalrequest data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    dataset: list[BaseDataPoint] = Field(...)
    eval_run_id: str = Field(...)
    options: Any | None = Field(default=None)

genkit.evaluator.EvalResponse

Bases: RootModel[list[EvalFnResponse]]

Root model for evalresponse.

Source code in packages/genkit/src/genkit/_core/_typing.py
900
901
902
903
class EvalResponse(RootModel[list[EvalFnResponse]]):
    """Root model for evalresponse."""

    root: list[EvalFnResponse]

genkit.evaluator.EvalFnResponse

Bases: GenkitModel

Model for evalfnresponse data.

Source code in packages/genkit/src/genkit/_core/_typing.py
137
138
139
140
141
142
143
144
145
class EvalFnResponse(GenkitModel):
    """Model for evalfnresponse data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    sample_index: float | None = None
    test_case_id: str = Field(...)
    trace_id: str | None = None
    span_id: str | None = None
    evaluation: Score = Field(...)

genkit.evaluator.Score

Bases: GenkitModel

Model for score data.

Source code in packages/genkit/src/genkit/_core/_typing.py
157
158
159
160
161
162
163
164
165
class Score(GenkitModel):
    """Model for score data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    id: str | None = None
    score: bool | float | str | None = Field(default=None)
    status: EvalStatusEnum | None = None
    error: str | None = None
    details: Details | None = None

genkit.evaluator.Details

Bases: GenkitModel

Model for details data.

Source code in packages/genkit/src/genkit/_core/_typing.py
767
768
769
770
771
class Details(GenkitModel):
    """Model for details data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='allow', populate_by_name=True)
    reasoning: str | None = None

genkit.evaluator.BaseEvalDataPoint

Bases: GenkitModel

Model for baseevaldatapoint data.

Source code in packages/genkit/src/genkit/_core/_typing.py
125
126
127
128
129
130
131
132
133
134
class BaseEvalDataPoint(GenkitModel):
    """Model for baseevaldatapoint data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    input: Any | None = Field(default=None)
    output: Any | None = Field(default=None)
    context: list[Any] | None = None
    reference: Any | None = Field(default=None)
    test_case_id: str = Field(...)
    trace_ids: list[str] | None = None

genkit.evaluator.BaseDataPoint

Bases: GenkitModel

Model for basedatapoint data.

Source code in packages/genkit/src/genkit/_core/_typing.py
113
114
115
116
117
118
119
120
121
122
class BaseDataPoint(GenkitModel):
    """Model for basedatapoint data."""

    model_config: ClassVar[ConfigDict] = ConfigDict(alias_generator=to_camel, extra='forbid', populate_by_name=True)
    input: Any | None = Field(default=None)
    output: Any | None = Field(default=None)
    context: list[Any] | None = None
    reference: Any | None = Field(default=None)
    test_case_id: str | None = None
    trace_ids: list[str] | None = None

genkit.evaluator.EvalStatusEnum

Bases: StrEnum

EvalStatusEnum data type class.

Source code in packages/genkit/src/genkit/_core/_typing.py
37
38
39
40
41
42
class EvalStatusEnum(StrEnum):
    """EvalStatusEnum data type class."""

    UNKNOWN = 'UNKNOWN'
    PASS = 'PASS'
    FAIL = 'FAIL'

genkit.evaluator.evaluator_action_metadata(name, config_schema=None)

Create ActionMetadata for an evaluator action.

Source code in packages/genkit/src/genkit/_ai/_evaluator.py
73
74
75
76
77
78
79
80
81
82
83
84
def evaluator_action_metadata(
    name: str,
    config_schema: type | dict[str, Any] | None = None,
) -> ActionMetadata:
    """Create ActionMetadata for an evaluator action."""
    return ActionMetadata(
        action_type=ActionKind.EVALUATOR,
        name=name,
        input_json_schema=to_json_schema(EvalRequest),
        output_json_schema=to_json_schema(list[EvalFnResponse]),
        metadata={'evaluator': {'customOptions': to_json_schema(config_schema) if config_schema else None}},
    )

genkit.evaluator.evaluator_ref(name, config_schema=None)

Create an EvaluatorRef.

Source code in packages/genkit/src/genkit/_ai/_evaluator.py
68
69
70
def evaluator_ref(name: str, config_schema: dict[str, object] | None = None) -> EvaluatorRef:
    """Create an EvaluatorRef."""
    return EvaluatorRef(name=name, config_schema=config_schema)

genkit.evaluator.EvaluatorRef

Bases: BaseModel

Reference to an evaluator.

Source code in packages/genkit/src/genkit/_ai/_evaluator.py
59
60
61
62
63
64
65
class EvaluatorRef(BaseModel):
    """Reference to an evaluator."""

    model_config: ClassVar[ConfigDict] = ConfigDict(extra='forbid', populate_by_name=True, alias_generator=to_camel)

    name: str
    config_schema: dict[str, object] | None = None