Friday, July 25, 2025

What is Kubeflow Pipeline ( KFP)

 Yes, this is a single pipeline execution in Kubeflow Pipelines (KFP).

Let’s break down how it works:

How KFP Pipelines Work

In KFP, a "pipeline" is a single workflow run, composed of several "steps" (called components or ops).

Each component can be implemented as a Python function or container, and these are chained together to define data flow.

Your Pipeline Structure

In your provided code, the pipeline function calls:

process_data – handles data processing, outputs processed data.

ingest_data – takes the output from process_data and ingests it into the datastore.

Both steps are part of the same pipeline run/execution.

Execution Flow

When you trigger this pipeline (e.g., via the KFP UI or API), Kubeflow schedules and runs "process_data" first.


Once "process_data" finishes (and produces its output), "ingest_data" starts, using the output from the previous step.


Both steps are executed as part of a single pipeline run with the specified parameters.


The entire workflow (from processing to ingestion) is considered one pipeline execution.


In Summary

All steps inside a pipeline function are executed as a single pipeline run.


Each call to a component (like process_data and ingest_data) becomes an "operation" (step) in the pipeline’s Directed Acyclic Graph (DAG).


Their order and data passing is controlled by their arrangement (and dependencies, e.g., ingest_data uses the output of process_data).


Visual Representation (Simplified)

text

[process_data] ---> [ingest_data]

      (step 1)           (step 2)

   (both belong to the SAME pipeline run)

In summary:

Even though your pipeline calls two separate components, the whole process—from data processing to ingestion—is executed as one, single pipeline execution in KFP. All the steps defined in the pipeline function make up a single workflow.

Thursday, July 24, 2025

What is difference between Vertex Vector Search and Vertex Vector AI search

Vertex AI Search and Vector Search are related but represent different levels of abstraction within the Google Cloud ecosystem for building AI-powered search and retrieval solutions.

Vector Search refers to the underlying technology and managed service for performing efficient similarity searches on large datasets of vector embeddings.

It is a core component that powers various applications requiring semantic understanding, such as recommendation engines, semantic search, content discovery, and more.

Vector Search provides the infrastructure for storing, indexing, and querying vector embeddings, which are numerical representations of data (text, images, audio, etc.) that capture their meaning or context.

Developers can directly interact with Vector Search to build custom applications that leverage vector similarity search.

Vertex AI Search is a higher-level, out-of-the-box solution built on top of Google's search technologies, including Vector Search.

It provides a comprehensive platform for building enterprise-grade search engines with features like retrieval-augmented generation (RAG), automatic embedding fine-tuning, and connectors to various data sources.

Vertex AI Search simplifies the process of creating and deploying search experiences, offering a more managed and integrated approach compared to building a solution from scratch using raw Vector Search.

It aims to provide Google-quality search capabilities for websites, applications, and internal knowledge bases, often incorporating generative AI features for more intelligent responses.

In essence, Vector Search is a fundamental building block, a highly performant vector database service, while Vertex AI Search is a complete, managed solution that utilizes Vector Search and other Google technologies to deliver ready-to-use search capabilities for enterprises. Developers can choose to use Vector Search directly for highly customized or niche use cases, or opt for Vertex AI Search for a more streamlined and feature-rich search engine experience.


References

 

Monday, July 21, 2025

Steps to enable Google Vertex AI Engine

 Here's how to get your credentials set up so your agent can run on the Vertex AI engine:

1. Set Up Application Default Credentials (ADC)

The easiest and most recommended way to set up ADC for local development is by using the gcloud CLI.

Steps:

Install Google Cloud SDK: If you haven't already, install the Google Cloud SDK. Follow the instructions here: https://cloud.google.com/sdk/docs/install

Initialize the gcloud CLI:

Bash

gcloud init

This command will guide you through setting up your default project and zone/region. Make sure to select the Google Cloud project where your Vertex AI resources are located.

Authenticate Application Default Credentials:

Bash

gcloud auth application-default login

This command will open a web browser, prompt you to log in with your Google account, and grant access to the Google Cloud SDK. Once authorized, it stores your credentials in a well-known location on your local file system (~/.config/gcloud/application_default_credentials.json on Linux/macOS, or %APPDATA%\gcloud\application_default_credentials.json on Windows).

These are the credentials that your Python application (and the vertexai library) will automatically pick up.


2. Verify Your Project Configuration

Ensure that your code is configured to use the correct Google Cloud project ID. While ADC will pick up credentials, you often need to explicitly tell Vertex AI which project to operate within.


You likely have a config.py file or similar where you define your Google Cloud project ID and region. Make sure these are accurate.


Example (from config.py or similar):


Python


# config.py

class Config:

    PROJECT_ID = "your-gcp-project-id" # Replace with your actual project ID

    REGION = "us-central1" # Or your desired region

    # ... other configurations

And in your agent_on_ai_engine.py (or wherever you initialize Vertex AI):


Python


import vertexai


# Initialize Vertex AI with your project and region

vertexai.init(project="your-gcp-project-id", location="us-central1")


# ... rest of your code to deploy and run the agent

Make sure your-gcp-project-id and us-central1 (or your chosen region) match the project you authenticated with in step 1.


3. Service Account (for Production or Specific Roles)

While gcloud auth application-default login is great for local development, for production environments or if you need your application to run with specific, granular permissions, you should use a service account.


Steps to use a Service Account:


Create a Service Account:


Go to the Google Cloud Console: https://console.cloud.google.com/


Navigate to IAM & Admin > Service Accounts.


Click + CREATE SERVICE ACCOUNT.


Give it a name, ID, and description.


Grant roles: This is critical. For a Vertex AI agent, you'll typically need roles like:


Vertex AI User (roles/aiplatform.user)


Service Account User (roles/iam.serviceAccountUser) - often needed if the service account needs to impersonate other service accounts or run Cloud Functions/Run.


Storage Object Viewer (roles/storage.objectViewer) or Storage Object Admin if your agent needs to read/write from Cloud Storage buckets (e.g., for RAG).


BigQuery Data Viewer / BigQuery Job User if interacting with BigQuery.


Grant the principle of least privilege. Only grant the roles absolutely necessary for your agent's functionality.


Click Done.


Generate a JSON Key for the Service Account:


On the Service Accounts page, click on the service account you just created.


Go to the Keys tab.


Click ADD KEY > Create new key.


Select JSON and click CREATE.


A JSON key file will be downloaded to your computer. Keep this file secure! Do not commit it to version control.


Set GOOGLE_APPLICATION_CREDENTIALS Environment Variable:


Open your terminal/command prompt.


Set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the full path of the downloaded JSON key file.


On Linux/macOS:


Bash


export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your-service-account-key.json"

On Windows (Command Prompt):


DOS


set GOOGLE_APPLICATION_CREDENTIALS="C:\path\to\your-service-account-key.json"

On Windows (PowerShell):


PowerShell


$env:GOOGLE_APPLICATION_CREDENTIALS="C:\path\to\your-service-account-key.json"

This environment variable tells ADC to use this specific key file for authentication. You'll need to set this every time you open a new terminal session, or add it to your shell's profile script (e.g., .bashrc, .zshrc, config.fish).


After performing step 1 (or step 3 if you're using a service account), try running your Agent on the Vertex AI engine again. The google.auth.default() function should now successfully find your credentials.




Whats difference between Vertex AI and ADK?

When choosing between Google's Agent Development Kit (ADK) and Vertex AI for your AI development, it's not really an "either/or" situation. They serve different, complementary purposes, and in many real-world scenarios, you'll likely use both.

Here's a breakdown to help you understand which is "good" for what:

What is Vertex AI?

Vertex AI is Google Cloud's comprehensive machine learning (ML) platform. It's an end-to-end MLOps platform that provides tools and services for the entire ML lifecycle:

Data Preparation: Data labeling, feature store.

Model Training: AutoML (no code ML), custom training (with your own code using frameworks like TensorFlow, PyTorch), hyperparameter tuning.

Model Management: Model Registry for versioning and tracking.

Model Deployment & Serving: Endpoints for online inference, batch prediction.

Monitoring & Governance: Model monitoring for drift, explainability, MLOps pipelines.

Generative AI: Access to Google's large generative models (like Gemini, PaLM) through APIs, fine-tuning capabilities.

When to use Vertex AI:

Traditional ML Workflows: If you're building predictive models (e.g., customer churn, sales forecasting, fraud detection) from structured data (spreadsheets, databases).

Custom Model Training: When you need to train your own custom ML models from scratch or fine-tune existing models (including LLMs) with your specific data.

Scalable MLOps: For managing the entire lifecycle of ML models in production, with features like version control, reproducibility, monitoring, and automated retraining.

Enterprise-Grade Security & Governance: When you need robust security, compliance, and control over your AI assets.

Unified Platform: If you want a single platform to handle all aspects of your ML and AI development, from data to deployment.

Leveraging Google's Infrastructure: When you need the scalability and reliability of Google Cloud's compute resources (GPUs, TPUs).

What is the Agent Development Kit (ADK)?

ADK is an open-source framework specifically designed for building intelligent agents powered by Large Language Models (LLMs). It's built on the same framework that powers Google's internal agent systems. ADK is focused on:

Agentic Capabilities: Reasoning, tool use, memory, multi-turn conversations.

Orchestration: Defining how LLMs interact with tools, retrieve information, and execute complex, multi-step tasks.

Multi-Agent Systems: Building applications where multiple specialized agents collaborate, delegate tasks, and communicate.


Developer Experience: Provides a structured, Pythonic way to define agents, tools, and workflows, with CLI and a web UI for local development and debugging.


Flexibility: Works with various LLMs (Gemini, open-source models, models from other providers via LiteLLM) and integrates with other agent frameworks like LangChain.


When to use ADK:


Building AI Assistants/Co-pilots: If you want to create interactive agents that can understand natural language, answer questions, take actions, or automate tasks.


Tool Use & External Systems: When your agent needs to interact with external APIs, databases, retrieve documents (RAG), run code, or perform specific business logic based on LLM reasoning.


Complex Workflows with LLMs: For tasks that involve dynamic behavior, planning, and execution steps guided by an LLM (e.g., a travel booking agent, a data analysis assistant).


Multi-Agent Coordination: When you envision a system where different AI agents specialize in different tasks and collaborate to achieve a larger goal.


Fast Prototyping & Iteration: ADK is designed for quick development and testing of LLM-powered agent features.


Real-time Interaction: Native support for bidirectional audio and video streaming for human-like conversational experiences.


The Synergy: Using ADK and Vertex AI Together

The "good" choice is often both. They are complementary, not competing, tools:


You can train and fine-tune your custom LLMs or traditional ML models on Vertex AI, and then deploy them as models that your ADK agents can use for their reasoning and decision-making.


An ADK agent can be designed to monitor business metrics and, if certain conditions are met, trigger a Vertex AI Pipeline to retrain an underlying ML model.


Your ADK agent can use tools that call Vertex AI services (e.g., Vertex AI Search for RAG, Vertex AI Vision for image analysis, a deployed custom model endpoint on Vertex AI for specific predictions).


You can deploy your ADK agents to Vertex AI's managed runtime (or Agent Engine, when generally available) for enterprise-grade scalability, monitoring, and MLOps practices.


In summary:


Use Vertex AI when your primary need is training, deploying, and managing machine learning models (including LLMs) at scale, or leveraging a unified platform for MLOps.


Use ADK when your primary need is building intelligent, interactive, and tool-using agents (often powered by LLMs) that can orchestrate complex, dynamic workflows.


If you're building a sophisticated AI application on Google Cloud, you'll likely use Vertex AI as the underlying platform for your models and infrastructure, and ADK as the framework for building the intelligent agentic layer on top of it.

What is Vertex AI Engine

Vertex AI Agent Engine (formerly known as LangChain on Vertex AI or Vertex AI Reasoning Engine) is a set of services that enables developers to deploy, manage, and scale AI agents in production. Agent Engine handles the infrastructure to scale agents in production so you can focus on creating applications. Vertex AI Agent Engine offers the following services that you can use individually or in combination:

Managed runtime:

Deploy and scale agents with a managed runtime and end-to-end management capabilities.

Customize the agent's container image with build-time installation scripts for system dependencies.

Use security features including VPC-SC compliance and configuration of authentication and IAM.

Access models and tools such as function calling.

Deploy agents built using different Python frameworks:

Context management:

Sessions (Preview): Agent Engine Sessions lets you store individual interactions between users and agents, providing definitive sources for conversation context.

Memory Bank (Preview): Agent Engine Memory Bank lets you store and retrieve information from sessions to personalize agent interactions.

Quality and evaluation (Preview):

Evaluate agent quality with the integrated Gen AI Evaluation service.

Example Store (Preview): Store and dynamically retrieve few-shot examples to improve agent performance.

Optimize agents with Gemini model training runs.

Observability:

Understand agent behavior with Google Cloud Trace (supporting OpenTelemetry), Cloud Monitoring, and Cloud Logging.

Create and deploy on Vertex AI Agent Engine

Note: For a streamlined, IDE-based development and deployment experience with Vertex AI Agent Engine, consider the agent-starter-pack. It provides ready-to-use templates, a built-in UI for experimentation, and simplifies deployment, operations, evaluation, customization, and observability.

The workflow for building an agent on Vertex AI Agent Engine is:

Steps Description

1. Set up the environment Set up your Google project and install the latest version of the Vertex AI SDK for Python.

2. Develop an agent Develop an agent that can be deployed on Vertex AI Agent Engine.

3. Deploy the agent Deploy the agent on the Vertex AI Agent Engine managed runtime.

4. Use the agent Query the agent by sending an API request.

5. Manage the deployed agent Manage and delete agents that you have deployed to Vertex AI Agent Engine.



Sunday, July 20, 2025

What is a Custom Agent in ADK?

A Custom Agent is essentially any class you create that inherits from google.adk.agents.BaseAgent and implements its core execution logic within the _run_async_impl asynchronous method. You have complete control over how this method calls other agents (sub-agents), manages state, and handles events.

Why Use Them?

While the standard Workflow Agents (SequentialAgent, LoopAgent, ParallelAgent) cover common orchestration patterns, you'll need a Custom agent when your requirements include:

Conditional Logic: Executing different sub-agents or taking different paths based on runtime conditions or the results of previous steps.

Complex State Management: Implementing intricate logic for maintaining and updating state throughout the workflow beyond simple sequential passing.

External Integrations: Incorporating calls to external APIs, databases, or custom libraries directly within the orchestration flow control.

Dynamic Agent Selection: Choosing which sub-agent(s) to run next based on dynamic evaluation of the situation or input.

Unique Workflow Patterns: Implementing orchestration logic that doesn't fit the standard sequential, parallel, or loop structures.

The heart of any custom agent is the _run_async_impl method. This is where you define its unique behavior.

Signature: async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:

Asynchronous Generator: It must be an async def function and return an AsyncGenerator. This allows it to yield events produced by sub-agents or its own logic back to the runner.

ctx (InvocationContext): Provides access to crucial runtime information, most importantly ctx.session.state, which is the primary way to share data between steps orchestrated by your custom agent.

Calling Sub-Agents: You invoke sub-agents (which are typically stored as instance attributes like self.my_llm_agent) using their run_async method and yield their events:

async for event in self.some_sub_agent.run_async(ctx):

    # Optionally inspect or log the event

    yield event # Pass the event up


Managing State: Read from and write to the session state dictionary (ctx.session.state) to pass data between sub-agent calls or make decisions:


# Read data set by a previous agent

previous_result = ctx.session.state.get("some_key")


# Make a decision based on state

if previous_result == "some_value":

    # ... call a specific sub-agent ...

else:

    # ... call another sub-agent ...


# Store a result for a later step (often done via a sub-agent's output_key)

# ctx.session.state["my_custom_result"] = "calculated_value"


Implementing Control Flow: Use standard Python constructs (if/elif/else, for/while loops, try/except) to create sophisticated, conditional, or iterative workflows involving your sub-agents.



Managing Sub-Agents and State¶

Typically, a custom agent orchestrates other agents (like LlmAgent, LoopAgent, etc.).


Initialization: You usually pass instances of these sub-agents into your custom agent's constructor and store them as instance fields/attributes (e.g., this.story_generator = story_generator_instance or self.story_generator = story_generator_instance). This makes them accessible within the custom agent's core asynchronous execution logic (such as: _run_async_impl method).

Sub Agents List: When initializing the BaseAgent using it's super() constructor, you should pass a sub agents list. This list tells the ADK framework about the agents that are part of this custom agent's immediate hierarchy. It's important for framework features like lifecycle management, introspection, and potentially future routing capabilities, even if your core execution logic (_run_async_impl) calls the agents directly via self.xxx_agent. Include the agents that your custom logic directly invokes at the top level.

State: As mentioned, ctx.session.state is the standard way sub-agents (especially LlmAgents using output key) communicate results back to the orchestrator and how the orchestrator passes necessary inputs down.



Design Pattern Example: StoryFlowAgent¶

Let's illustrate the power of custom agents with an example pattern: a multi-stage content generation workflow with conditional logic.


Goal: Create a system that generates a story, iteratively refines it through critique and revision, performs final checks, and crucially, regenerates the story if the final tone check fails.


Why Custom? The core requirement driving the need for a custom agent here is the conditional regeneration based on the tone check. Standard workflow agents don't have built-in conditional branching based on the outcome of a sub-agent's task. We need custom logic (if tone == "negative": ...) within the orchestrator.




https://google.github.io/adk-docs/agents/custom-agents/#part-4-instantiating-and-running-the-custom-agent

Saturday, July 19, 2025

What is ServerlessWorkflow ?

Serverless Workflow presents a vendor-neutral, open-source, and entirely community-driven ecosystem tailored for defining and executing DSL-based workflows in the realm of Serverless technology.

The Serverless Workflow DSL is a high-level language that reshapes the terrain of workflow creation, boasting a design that is ubiquitous, intuitive, imperative, and fluent.


Usability

Designed with linguistic fluency, implicit default behaviors, and minimal technical jargon, making workflows accessible to developers with diverse skill levels and enhancing collaboration.


Event driven

Supports event-driven execution and various scheduling options, including CRON expressions and time-based triggers, to respond efficiently to dynamic conditions.


Interoperability

Seamlessly integrates with multiple protocols (HTTP, gRPC, OpenAPI, AsyncAPI), ensuring easy communication with external systems and services, along with support for custom interactions via scripts, containers, or shell commands.


Platform-Agnostic

Serverless Workflow enables developers to build workflows that can operate across diverse platforms and environments, eliminating the need for platform-specific adaptations.


Extensibility

Provides extensible components and supports defining custom functions and extensions, allowing developers to tailor workflows to unique business requirements without compromising compatibility.


Fault tolerant

Offers comprehensive data transformation, validation, and fault tolerance mechanisms, ensuring workflows are robust, reliable, and capable of handling complex processes and failures gracefully.


Async API Example

document:

  dsl: '1.0.0'

  namespace: default

  name: call-asyncapi

  version: '1.0.0'

do:

- findPet:

    call: asyncapi

    with:

      document:

        uri: https://fake.com/docs/asyncapi.json

      operationRef: findPetsByStatus

      server: staging

      message:

        payload:

          petId: ${ .pet.id }

      authentication:

        bearer:

          token: ${ .token }





Container Example

document:

  dsl: '1.0.0'

  namespace: default

  name: run-container

  version: '1.0.0'

do:

  - runContainer:

      run:

        container:

          image: fake-image




Emit Event Example

document:

  dsl: '1.0.0'

  namespace: default

  name: emit

  version: '0.1.0'

do:

  - emitEvent:

      emit:

        event:

          with:

            source: https://petstore.com

            type: com.petstore.order.placed.v1

            data:

              client:

                firstName: Cruella

                lastName: de Vil

              items:

                - breed: dalmatian

                  quantity: 101




document:

  dsl: '1.0.0'

  namespace: default

  name: for-example

  version: '0.1.0'

do:

  - checkup:

      for:

        each: pet

        in: .pets

        at: index

      while: .vet != null

      do:

        - waitForCheckup:

            listen:

              to:

                one:

                  with:

                    type: com.fake.petclinic.pets.checkup.completed.v2

            output:

              as: '.pets + [{ "id": $pet.id }]'




Fork Example

document:

  dsl: '1.0.0'

  namespace: default

  name: fork-example

  version: '0.1.0'

do:

  - raiseAlarm:

      fork:

        compete: true

        branches:

          - callNurse:

              call: http

              with:

                method: put

                endpoint: https://fake-hospital.com/api/v3/alert/nurses

                body:

                  patientId: ${ .patient.fullName }

                  room: ${ .room.number }

          - callDoctor:

              call: http

              with:

                method: put

                endpoint: https://fake-hospital.com/api/v3/alert/doctor

                body:

                  patientId: ${ .patient.fullName }

                  room: ${ .room.number }



gRPC Example

document:

  dsl: '1.0.0'

  namespace: default

  name: call-grpc

  version: '1.0.0'

do:

  - greet:

      call: grpc

      with:

        proto: 

          endpoint: file://app/greet.proto

        service:

          name: GreeterApi.Greeter

          host: localhost

          port: 5011

        method: SayHello

        arguments:

          name: '${ .user.preferredDisplayName }'





HTTP Example

document:

  dsl: '1.0.0'

  namespace: default

  name: call-http

  version: '1.0.0'

do:

- getPet:

    call: http

    with:

      method: get

      endpoint: https://petstore.swagger.io/v2/pet/{petId}




Listen Event Example

document:

  dsl: '1.0.0'

  namespace: default

  name: listen-to-all

  version: '0.1.0'

do:

  - callDoctor:

      listen:

        to:

          all:

          - with:

              type: com.fake-hospital.vitals.measurements.temperature

              data: ${ .temperature > 38 }

          - with:

              type: com.fake-hospital.vitals.measurements.bpm

              data: ${ .bpm < 60 or .bpm > 100 }





Open API Example

document:

  dsl: '1.0.0'

  namespace: default

  name: call-openapi

  version: '1.0.0'

do:

  - findPet:

      call: openapi

      with:

        document: 

          endpoint: https://petstore.swagger.io/v2/swagger.json

        operationId: findPetsByStatus

        parameters:

          status: available




Raise Error Example

document:

  dsl: '1.0.0'

  namespace: default

  name: raise-not-implemented

  version: '0.1.0'

do: 

  - notImplemented:

      raise:

        error:

          type: https://serverlessworkflow.io/errors/not-implemented

          status: 500

          title: Not Implemented

          detail: ${ "The workflow '\( $workflow.definition.document.name ):\( $workflow.definition.document.version )' is a work in progress and cannot be run yet" }





Script Example

document:

  dsl: '1.0.0'

  namespace: samples

  name: run-script-with-arguments

  version: 0.1.0

do:

  - log:

      run:

        script:

          language: javascript

          arguments:

            message: ${ .message }

          code: >

            console.log(message)





Subflow Example

document:

  dsl: '1.0.0'

  namespace: default

  name: run-subflow

  version: '0.1.0'

do:

  - registerCustomer:

      run:

        workflow:

          namespace: default

          name: register-customer

          version: '0.1.0'

          input:

            customer: .user




document:

  dsl: '1.0.0'

  namespace: default

  name: switch-example

  version: '0.1.0'

do:

  - processOrder:

      switch:

        - case1:

            when: .orderType == "electronic"

            then: processElectronicOrder

        - case2:

            when: .orderType == "physical"

            then: processPhysicalOrder

        - default:

            then: handleUnknownOrderType

  - processElectronicOrder:

      do:

        - validatePayment:

            call: http

            with:

              method: post

              endpoint: https://fake-payment-service.com/validate

        - fulfillOrder:

            call: http

            with:

              method: post

              endpoint: https://fake-fulfillment-service.com/fulfill

      then: exit

  - processPhysicalOrder:

      do:

        - checkInventory:

            call: http

            with:

              method: get

              endpoint: https://fake-inventory-service.com/inventory

        - packItems:

            call: http

            with:

              method: post

              endpoint: https://fake-packaging-service.com/pack

        - scheduleShipping:

            call: http

            with:

              method: post

              endpoint: https://fake-shipping-service.com/schedule

      then: exit

  - handleUnknownOrderType:

      do:

        - logWarning:

            call: http

            with:

              method: post

              endpoint: https://fake-logging-service.com/warn

        - notifyAdmin:

            call: http

            with:

              method: post

              endpoint: https://fake-notification-service.com/notify





Try-Catch Example

document:

  dsl: '1.0.0'

  namespace: default

  name: try-catch

  version: '0.1.0'

do:

  - tryGetPet:

      try:

        - getPet:

            call: http

            with:

              method: get

              endpoint: https://petstore.swagger.io/v2/pet/{petId}

      catch:

        errors:

          with:

            type: https://serverlessworkflow.io/spec/1.0.0/errors/communication

            status: 404

        as: error

        do:

          - notifySupport:

              emit:

                event:

                  with:

                    source: https://petstore.swagger.io

                    type: io.swagger.petstore.events.pets.not-found.v1

                    data: ${ $error }

          - setError:

              set:

                error: $error

              export:

                as: '$context + { error: $error }'

  - buyPet:

      if: $context.error == null

      call: http

      with:

        method: put

        endpoint: https://petstore.swagger.io/v2/pet/{petId}

        body: '${ . + { status: "sold" } }'






Wait Example

document:

  dsl: '1.0.0'

  namespace: default

  name: wait-duration-inline

  version: '0.1.0'

do: 

  - wait30Seconds:

      wait:

        seconds: 30


How to Use Hosted & Tuned Models on Vertex AI

For enterprise-grade scalability, reliability, and integration with Google Cloud's MLOps ecosystem, you can use models deployed to Vertex AI Endpoints. This includes models from Model Garden or your own fine-tuned models.


Integration Method: Pass the full Vertex AI Endpoint resource string (projects/PROJECT_ID/locations/LOCATION/endpoints/ENDPOINT_ID) directly to the model parameter of LlmAgent.


Ensure your environment is configured for Vertex AI:


Authentication: Use Application Default Credentials (ADC):



gcloud auth application-default login

Environment Variables: Set your project and location:



export GOOGLE_CLOUD_PROJECT="YOUR_PROJECT_ID"

export GOOGLE_CLOUD_LOCATION="YOUR_VERTEX_AI_LOCATION" # e.g., us-central1

Enable Vertex Backend: Crucially, ensure the google-genai library targets Vertex AI:



export GOOGLE_GENAI_USE_VERTEXAI=TRUE


Model Garden Deployments


You can deploy various open and proprietary models from the Vertex AI Model Garden to an endpoint.


from google.adk.agents import LlmAgent

from google.genai import types # For config objects


# --- Example Agent using a Llama 3 model deployed from Model Garden ---


# Replace with your actual Vertex AI Endpoint resource name

llama3_endpoint = "projects/YOUR_PROJECT_ID/locations/us-central1/endpoints/YOUR_LLAMA3_ENDPOINT_ID"


agent_llama3_vertex = LlmAgent(

    model=llama3_endpoint,

    name="llama3_vertex_agent",

    instruction="You are a helpful assistant based on Llama 3, hosted on Vertex AI.",

    generate_content_config=types.GenerateContentConfig(max_output_tokens=2048),

    # ... other agent parameters

)


Fine-tuned Model Endpoints


from google.adk.agents import LlmAgent


# --- Example Agent using a fine-tuned Gemini model endpoint ---


# Replace with your fine-tuned model's endpoint resource name

finetuned_gemini_endpoint = "projects/YOUR_PROJECT_ID/locations/us-central1/endpoints/YOUR_FINETUNED_ENDPOINT_ID"


agent_finetuned_gemini = LlmAgent(

    model=finetuned_gemini_endpoint,

    name="finetuned_gemini_agent",

    instruction="You are a specialized assistant trained on specific data.",

    # ... other agent parameters

)


Third-Party Models on Vertex AI (e.g., Anthropic Claude)¶



Some providers, like Anthropic, make their models available directly through Vertex AI.


Integration Method: Uses the direct model string (e.g., "claude-3-sonnet@20240229"), but requires manual registration within ADK.


Why Registration? ADK's registry automatically recognizes gemini-* strings and standard Vertex AI endpoint strings (projects/.../endpoints/...) and routes them via the google-genai library. For other model types used directly via Vertex AI (like Claude), you must explicitly tell the ADK registry which specific wrapper class (Claude in this case) knows how to handle that model identifier string with the Vertex AI backend.


Setup:


Vertex AI Environment: Ensure the consolidated Vertex AI setup (ADC, Env Vars, GOOGLE_GENAI_USE_VERTEXAI=TRUE) is complete.


Install Provider Library: Install the necessary client library configured for Vertex AI.



pip install "anthropic[vertex]"

Register Model Class: Add this code near the start of your application, before creating an agent using the Claude model string:



# Required for using Claude model strings directly via Vertex AI with LlmAgent

from google.adk.models.anthropic_llm import Claude

from google.adk.models.registry import LLMRegistry


LLMRegistry.register(Claude)


from google.adk.agents import LlmAgent

from google.adk.models.anthropic_llm import Claude # Import needed for registration

from google.adk.models.registry import LLMRegistry # Import needed for registration

from google.genai import types


# --- Register Claude class (do this once at startup) ---

LLMRegistry.register(Claude)


# --- Example Agent using Claude 3 Sonnet on Vertex AI ---


# Standard model name for Claude 3 Sonnet on Vertex AI

claude_model_vertexai = "claude-3-sonnet@20240229"


agent_claude_vertexai = LlmAgent(

    model=claude_model_vertexai, # Pass the direct string after registration

    name="claude_vertexai_agent",

    instruction="You are an assistant powered by Claude 3 Sonnet on Vertex AI.",

    generate_content_config=types.GenerateContentConfig(max_output_tokens=4096),

    # ... other agent parameters

)


How to use different models with ADK

ADK primarily uses two mechanisms for model integration:


Direct String / Registry: For models tightly integrated with Google Cloud (like Gemini models accessed via Google AI Studio or Vertex AI) or models hosted on Vertex AI endpoints. You typically provide the model name or endpoint resource string directly to the LlmAgent. ADK's internal registry resolves this string to the appropriate backend client, often utilizing the google-genai library.

Wrapper Classes: For broader compatibility, especially with models outside the Google ecosystem or those requiring specific client configurations (like models accessed via LiteLLM). You instantiate a specific wrapper class (e.g., LiteLlm) and pass this object as the model parameter to your LlmAgent.



Using Google Gemini Models¶

This section covers authenticating with Google's Gemini models, either through Google AI Studio for rapid development or Google Cloud Vertex AI for enterprise applications. This is the most direct way to use Google's flagship models within ADK.


Integration Method: Once you are authenticated using one of the below methods, you can pass the model's identifier string directly to the model parameter of LlmAgent.


The google-genai library, used internally by ADK for Gemini models, can connect through either Google AI Studio or Vertex AI.


Model support for voice/video streaming


In order to use voice/video streaming in ADK, you will need to use Gemini models that support the Live API. You can find the model ID(s) that support the Gemini Live API in the documentation:

Google AI Studio: Gemini Live API

Vertex AI: Gemini Live API


Google AI Studio


This is the simplest method and is recommended for getting started quickly.


Authentication Method: API Key

Setup:


Get an API key: Obtain your key from Google AI Studio.

Set environment variables: Create a .env file (Python) or .properties (Java) in your project's root directory and add the following lines. ADK will automatically load this file.



export GOOGLE_API_KEY="YOUR_GOOGLE_API_KEY"

export GOOGLE_GENAI_USE_VERTEXAI=FALSE

(or)


Pass these variables during the model initialization via the Client (see example below).


Google Cloud Vertex AI¶

For scalable and production-oriented use cases, Vertex AI is the recommended platform. Gemini on Vertex AI supports enterprise-grade features, security, and compliance controls


Based on your development environment and usecase, choose one of the below methods to authenticate. Pre-requisites: A Google Cloud Project with Vertex AI enabled.


Method A: User Credentials (for Local Development)¶

Install the gcloud CLI: Follow the official installation instructions.

Log in using ADC: This command opens a browser to authenticate your user account for local development.


gcloud auth application-default login

Set environment variables:



export GOOGLE_CLOUD_PROJECT="YOUR_PROJECT_ID"

export GOOGLE_CLOUD_LOCATION="YOUR_VERTEX_AI_LOCATION" # e.g., us-central1

Explicitly tell the library to use Vertex AI:



export GOOGLE_GENAI_USE_VERTEXAI=TRUE


Models: Find available model IDs in the Vertex AI documentation.




Method B: Vertex AI Express Mode¶

Vertex AI Express Mode offers a simplified, API-key-based setup for rapid prototyping.


Sign up for Express Mode to get your API key.

Set environment variables:


export GOOGLE_API_KEY="PASTE_YOUR_EXPRESS_MODE_API_KEY_HERE"

export GOOGLE_GENAI_USE_VERTEXAI=TRUE

Method C: Service Account (for Production & Automation)¶

For deployed applications, a service account is the standard method.


Create a Service Account and grant it the Vertex AI User role.

Provide credentials to your application:

On Google Cloud: If you are running the agent in Cloud Run, GKE, VM or other Google Cloud services, the environment can automatically provide the service account credentials. You don't have to create a key file.

Elsewhere: Create a service account key file and point to it with an environment variable:


export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/keyfile.json"

Instead of the key file, you can also authenticate the service account using Workload Identity. But this is outside the scope of this guide.




Method C: Service Account (for Production & Automation)¶

For deployed applications, a service account is the standard method.


Create a Service Account and grant it the Vertex AI User role.

Provide credentials to your application:

On Google Cloud: If you are running the agent in Cloud Run, GKE, VM or other Google Cloud services, the environment can automatically provide the service account credentials. You don't have to create a key file.

Elsewhere: Create a service account key file and point to it with an environment variable:


export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/keyfile.json"

Instead of the key file, you can also authenticate the service account using Workload Identity. But this is outside the scope of this guide.



from google.adk.agents import LlmAgent


# --- Example using a stable Gemini Flash model ---

agent_gemini_flash = LlmAgent(

    # Use the latest stable Flash model identifier

    model="gemini-2.0-flash",

    name="gemini_flash_agent",

    instruction="You are a fast and helpful Gemini assistant.",

    # ... other agent parameters

)


# --- Example using a powerful Gemini Pro model ---

# Note: Always check the official Gemini documentation for the latest model names,

# including specific preview versions if needed. Preview models might have

# different availability or quota limitations.

agent_gemini_pro = LlmAgent(

    # Use the latest generally available Pro model identifier

    model="gemini-2.5-pro-preview-03-25",

    name="gemini_pro_agent",

    instruction="You are a powerful and knowledgeable Gemini assistant.",

    # ... other agent parameters

)



Using Anthropic models


You can integrate Anthropic's Claude models directly using their API key or from a Vertex AI backend into your Java ADK applications by using the ADK's Claude wrapper class.


For Vertex AI backend, see the Third-Party Models on Vertex AI section.


Prerequisites:


Dependencies:


Anthropic SDK Classes (Transitive): The Java ADK's com.google.adk.models.Claude wrapper relies on classes from Anthropic's official Java SDK. These are typically included as transitive dependencies.

Anthropic API Key:


Obtain an API key from Anthropic. Securely manage this key using a secret manager.


Using Cloud & Proprietary Models via LiteLLM

To access a vast range of LLMs from providers like OpenAI, Anthropic (non-Vertex AI), Cohere, and many others, ADK offers integration through the LiteLLM library.


Integration Method: Instantiate the LiteLlm wrapper class and pass it to the model parameter of LlmAgent.


LiteLLM Overview: LiteLLM acts as a translation layer, providing a standardized, OpenAI-compatible interface to over 100+ LLMs.


Install LiteLLM

pip install litellm


Example for OpenAI:



export OPENAI_API_KEY="YOUR_OPENAI_API_KEY"

Example for Anthropic (non-Vertex AI):



export ANTHROPIC_API_KEY="YOUR_ANTHROPIC_API_KEY"

Consult the LiteLLM Providers Documentation for the correct environment variable names for other providers.


Example:



from google.adk.agents import LlmAgent

from google.adk.models.lite_llm import LiteLlm


# --- Example Agent using OpenAI's GPT-4o ---

# (Requires OPENAI_API_KEY)

agent_openai = LlmAgent(

    model=LiteLlm(model="openai/gpt-4o"), # LiteLLM model string format

    name="openai_agent",

    instruction="You are a helpful assistant powered by GPT-4o.",

    # ... other agent parameters

)


# --- Example Agent using Anthropic's Claude Haiku (non-Vertex) ---

# (Requires ANTHROPIC_API_KEY)

agent_claude_direct = LlmAgent(

    model=LiteLlm(model="anthropic/claude-3-haiku-20240307"),

    name="claude_direct_agent",

    instruction="You are an assistant powered by Claude Haiku.",

    # ... other agent parameters

)


Using Open & Local Models via LiteLLM

For maximum control, cost savings, privacy, or offline use cases, you can run open-source models locally or self-host them and integrate them using LiteLLM.


Integration Method: Instantiate the LiteLlm wrapper class, configured to point to your local model server.




Ollama Integration¶

Ollama allows you to easily run open-source models locally.


Model choice

If your agent is relying on tools, please make sure that you select a model with tool support from Ollama website.

For reliable results, we recommend using a decent-sized model with tool support.

The tool support for the model can be checked with the following command:


ollama show mistral-small3.1

  Model

    architecture        mistral3

    parameters          24.0B

    context length      131072

    embedding length    5120

    quantization        Q4_K_M


  Capabilities

    completion

    vision

    tools


You are supposed to see tools listed under capabilities.


You can also look at the template the model is using and tweak it based on your needs.



ollama show --modelfile llama3.2 > model_file_to_modify


For instance, the default template for the above model inherently suggests that the model shall call a function all the time. This may result in an infinite loop of function calls.


Given the following functions, please respond with a JSON for a function call

with its proper arguments that best answers the given prompt.


Respond in the format {"name": function name, "parameters": dictionary of

argument name and its value}. Do not use variables.


You can swap such prompts with a more descriptive one to prevent infinite tool call loops.


Review the user's prompt and the available functions listed below.

First, determine if calling one of these functions is the most appropriate way to respond. A function call is likely needed if the prompt asks for a specific action, requires external data lookup, or involves calculations handled by the functions. If the prompt is a general question or can be answered directly, a function call is likely NOT needed.


If you determine a function call IS required: Respond ONLY with a JSON object in the format {"name": "function_name", "parameters": {"argument_name": "value"}}. Ensure parameter values are concrete, not variables.


If you determine a function call IS NOT required: Respond directly to the user's prompt in plain text, providing the answer or information requested. Do not output any JSON.



Using ollama_chat provider¶

Our LiteLLM wrapper can be used to create agents with Ollama models.



root_agent = Agent(

    model=LiteLlm(model="ollama_chat/mistral-small3.1"),

    name="dice_agent",

    description=(

        "hello world agent that can roll a dice of 8 sides and check prime"

        " numbers."

    ),

    instruction="""

      You roll dice and answer questions about the outcome of the dice rolls.

    """,

    tools=[

        roll_die,

        check_prime,

    ],

)


Using openai provider¶

Alternatively, openai can be used as the provider name. But this will also require setting the OPENAI_API_BASE=http://localhost:11434/v1 and OPENAI_API_KEY=anything env variables instead of OLLAMA_API_BASE. Please note that api base now has /v1 at the end.



root_agent = Agent(

    model=LiteLlm(model="openai/mistral-small3.1"),

    name="dice_agent",

    description=(

        "hello world agent that can roll a dice of 8 sides and check prime"

        " numbers."

    ),

    instruction="""

      You roll dice and answer questions about the outcome of the dice rolls.

    """,

    tools=[

        roll_die,

        check_prime,

    ],

)


export OPENAI_API_BASE=http://localhost:11434/v1

export OPENAI_API_KEY=anything

adk web


You can see the request sent to the Ollama server by adding the following in your agent code just after imports.


import litellm

litellm._turn_on_debug()



Request Sent from LiteLLM:

curl -X POST \

http://localhost:11434/api/chat \

-d '{'model': 'mistral-small3.1', 'messages': [{'role': 'system', 'content': ...


Self-Hosted Endpoint (e.g., vLLM)


Tools such as vLLM allow you to host models efficiently and often expose an OpenAI-compatible API endpoint.


Setup:


Deploy Model: Deploy your chosen model using vLLM (or a similar tool). Note the API base URL (e.g., https://your-vllm-endpoint.run.app/v1).

Important for ADK Tools: When deploying, ensure the serving tool supports and enables OpenAI-compatible tool/function calling. For vLLM, this might involve flags like --enable-auto-tool-choice and potentially a specific --tool-call-parser, depending on the model. Refer to the vLLM documentation on Tool Use.



Authentication: Determine how your endpoint handles authentication (e.g., API key, bearer token).


Integration Example:


import subprocess

from google.adk.agents import LlmAgent

from google.adk.models.lite_llm import LiteLlm


# --- Example Agent using a model hosted on a vLLM endpoint ---


# Endpoint URL provided by your vLLM deployment

api_base_url = "https://your-vllm-endpoint.run.app/v1"


# Model name as recognized by *your* vLLM endpoint configuration

model_name_at_endpoint = "hosted_vllm/google/gemma-3-4b-it" # Example from vllm_test.py


# Authentication (Example: using gcloud identity token for a Cloud Run deployment)

# Adapt this based on your endpoint's security

try:

    gcloud_token = subprocess.check_output(

        ["gcloud", "auth", "print-identity-token", "-q"]

    ).decode().strip()

    auth_headers = {"Authorization": f"Bearer {gcloud_token}"}

except Exception as e:

    print(f"Warning: Could not get gcloud token - {e}. Endpoint might be unsecured or require different auth.")

    auth_headers = None # Or handle error appropriately


agent_vllm = LlmAgent(

    model=LiteLlm(

        model=model_name_at_endpoint,

        api_base=api_base_url,

        # Pass authentication headers if needed

        extra_headers=auth_headers

        # Alternatively, if endpoint uses an API key:

        # api_key="YOUR_ENDPOINT_API_KEY"

    ),

    name="vllm_agent",

    instruction="You are a helpful assistant running on a self-hosted vLLM endpoint.",

    # ... other agent parameters

)



What are Common Multi-Agent Patterns using ADK Primitives

Coordinator/Dispatcher Pattern

Structure: A central LlmAgent (Coordinator) manages several specialized sub_agents.

Goal: Route incoming requests to the appropriate specialist agent.

ADK Primitives Used:

Hierarchy: Coordinator has specialists listed in sub_agents.

Interaction: Primarily uses LLM-Driven Delegation (requires clear descriptions on sub-agents and appropriate instruction on Coordinator) or Explicit Invocation (AgentTool) (Coordinator includes AgentTool-wrapped specialists in its tools).


# Conceptual Code: Coordinator using LLM Transfer

from google.adk.agents import LlmAgent


billing_agent = LlmAgent(name="Billing", description="Handles billing inquiries.")

support_agent = LlmAgent(name="Support", description="Handles technical support requests.")


coordinator = LlmAgent(

    name="HelpDeskCoordinator",

    model="gemini-2.0-flash",

    instruction="Route user requests: Use Billing agent for payment issues, Support agent for technical problems.",

    description="Main help desk router.",

    # allow_transfer=True is often implicit with sub_agents in AutoFlow

    sub_agents=[billing_agent, support_agent]

)

# User asks "My payment failed" -> Coordinator's LLM should call transfer_to_agent(agent_name='Billing')

# User asks "I can't log in" -> Coordinator's LLM should call transfer_to_agent(agent_name='Support')



Sequential Pipeline Pattern


Structure: A SequentialAgent contains sub_agents executed in a fixed order.

Goal: Implement a multi-step process where the output of one step feeds into the next.

ADK Primitives Used:

Workflow: SequentialAgent defines the order.

Communication: Primarily uses Shared Session State. Earlier agents write results (often via output_key), later agents read those results from context.state.


# Conceptual Code: Sequential Data Pipeline

from google.adk.agents import SequentialAgent, LlmAgent


validator = LlmAgent(name="ValidateInput", instruction="Validate the input.", output_key="validation_status")

processor = LlmAgent(name="ProcessData", instruction="Process data if {validation_status} is 'valid'.", output_key="result")

reporter = LlmAgent(name="ReportResult", instruction="Report the result from {result}.")


data_pipeline = SequentialAgent(

    name="DataPipeline",

    sub_agents=[validator, processor, reporter]

)

# validator runs -> saves to state['validation_status']

# processor runs -> reads state['validation_status'], saves to state['result']

# reporter runs -> reads state['result']




Parallel Fan-Out/Gather Pattern

Structure: A ParallelAgent runs multiple sub_agents concurrently, often followed by a later agent (in a SequentialAgent) that aggregates results.

Goal: Execute independent tasks simultaneously to reduce latency, then combine their outputs.

ADK Primitives Used:

Workflow: ParallelAgent for concurrent execution (Fan-Out). Often nested within a SequentialAgent to handle the subsequent aggregation step (Gather).

Communication: Sub-agents write results to distinct keys in Shared Session State. The subsequent "Gather" agent reads multiple state keys.


# Conceptual Code: Parallel Information Gathering

from google.adk.agents import SequentialAgent, ParallelAgent, LlmAgent


fetch_api1 = LlmAgent(name="API1Fetcher", instruction="Fetch data from API 1.", output_key="api1_data")

fetch_api2 = LlmAgent(name="API2Fetcher", instruction="Fetch data from API 2.", output_key="api2_data")


gather_concurrently = ParallelAgent(

    name="ConcurrentFetch",

    sub_agents=[fetch_api1, fetch_api2]

)


synthesizer = LlmAgent(

    name="Synthesizer",

    instruction="Combine results from {api1_data} and {api2_data}."

)


overall_workflow = SequentialAgent(

    name="FetchAndSynthesize",

    sub_agents=[gather_concurrently, synthesizer] # Run parallel fetch, then synthesize

)

# fetch_api1 and fetch_api2 run concurrently, saving to state.

# synthesizer runs afterwards, reading state['api1_data'] and state['api2_data'].




Hierarchical Task Decomposition¶

Structure: A multi-level tree of agents where higher-level agents break down complex goals and delegate sub-tasks to lower-level agents.

Goal: Solve complex problems by recursively breaking them down into simpler, executable steps.

ADK Primitives Used:

Hierarchy: Multi-level parent_agent/sub_agents structure.

Interaction: Primarily LLM-Driven Delegation or Explicit Invocation (AgentTool) used by parent agents to assign tasks to subagents. Results are returned up the hierarchy (via tool responses or state).


# Conceptual Code: Hierarchical Research Task

from google.adk.agents import LlmAgent

from google.adk.tools import agent_tool


# Low-level tool-like agents

web_searcher = LlmAgent(name="WebSearch", description="Performs web searches for facts.")

summarizer = LlmAgent(name="Summarizer", description="Summarizes text.")


# Mid-level agent combining tools

research_assistant = LlmAgent(

    name="ResearchAssistant",

    model="gemini-2.0-flash",

    description="Finds and summarizes information on a topic.",

    tools=[agent_tool.AgentTool(agent=web_searcher), agent_tool.AgentTool(agent=summarizer)]

)


# High-level agent delegating research

report_writer = LlmAgent(

    name="ReportWriter",

    model="gemini-2.0-flash",

    instruction="Write a report on topic X. Use the ResearchAssistant to gather information.",

    tools=[agent_tool.AgentTool(agent=research_assistant)]

    # Alternatively, could use LLM Transfer if research_assistant is a sub_agent

)

# User interacts with ReportWriter.

# ReportWriter calls ResearchAssistant tool.

# ResearchAssistant calls WebSearch and Summarizer tools.

# Results flow back up.


Review/Critique Pattern (Generator-Critic)


Structure: Typically involves two agents within a SequentialAgent: a Generator and a Critic/Reviewer.

Goal: Improve the quality or validity of generated output by having a dedicated agent review it.

ADK Primitives Used:

Workflow: SequentialAgent ensures generation happens before review.

Communication: Shared Session State (Generator uses output_key to save output; Reviewer reads that state key). The Reviewer might save its feedback to another state key for subsequent steps.



# Conceptual Code: Generator-Critic

from google.adk.agents import SequentialAgent, LlmAgent


generator = LlmAgent(

    name="DraftWriter",

    instruction="Write a short paragraph about subject X.",

    output_key="draft_text"

)


reviewer = LlmAgent(

    name="FactChecker",

    instruction="Review the text in {draft_text} for factual accuracy. Output 'valid' or 'invalid' with reasons.",

    output_key="review_status"

)


# Optional: Further steps based on review_status


review_pipeline = SequentialAgent(

    name="WriteAndReview",

    sub_agents=[generator, reviewer]

)

# generator runs -> saves draft to state['draft_text']

# reviewer runs -> reads state['draft_text'], saves status to state['review_status']




Iterative Refinement Pattern

Structure: Uses a LoopAgent containing one or more agents that work on a task over multiple iterations.

Goal: Progressively improve a result (e.g., code, text, plan) stored in the session state until a quality threshold is met or a maximum number of iterations is reached.

ADK Primitives Used:

Workflow: LoopAgent manages the repetition.

Communication: Shared Session State is essential for agents to read the previous iteration's output and save the refined version.

Termination: The loop typically ends based on max_iterations or a dedicated checking agent setting escalate=True in the Event Actions when the result is satisfactory.


# Conceptual Code: Iterative Code Refinement

from google.adk.agents import LoopAgent, LlmAgent, BaseAgent

from google.adk.events import Event, EventActions

from google.adk.agents.invocation_context import InvocationContext

from typing import AsyncGenerator


# Agent to generate/refine code based on state['current_code'] and state['requirements']

code_refiner = LlmAgent(

    name="CodeRefiner",

    instruction="Read state['current_code'] (if exists) and state['requirements']. Generate/refine Python code to meet requirements. Save to state['current_code'].",

    output_key="current_code" # Overwrites previous code in state

)


# Agent to check if the code meets quality standards

quality_checker = LlmAgent(

    name="QualityChecker",

    instruction="Evaluate the code in state['current_code'] against state['requirements']. Output 'pass' or 'fail'.",

    output_key="quality_status"

)


# Custom agent to check the status and escalate if 'pass'

class CheckStatusAndEscalate(BaseAgent):

    async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:

        status = ctx.session.state.get("quality_status", "fail")

        should_stop = (status == "pass")

        yield Event(author=self.name, actions=EventActions(escalate=should_stop))


refinement_loop = LoopAgent(

    name="CodeRefinementLoop",

    max_iterations=5,

    sub_agents=[code_refiner, quality_checker, CheckStatusAndEscalate(name="StopChecker")]

)

# Loop runs: Refiner -> Checker -> StopChecker

# State['current_code'] is updated each iteration.

# Loop stops if QualityChecker outputs 'pass' (leading to StopChecker escalating) or after 5 iterations.



Human-in-the-Loop Pattern


Structure: Integrates human intervention points within an agent workflow.

Goal: Allow for human oversight, approval, correction, or tasks that AI cannot perform.

ADK Primitives Used (Conceptual):

Interaction: Can be implemented using a custom Tool that pauses execution and sends a request to an external system (e.g., a UI, ticketing system) waiting for human input. The tool then returns the human's response to the agent.

Workflow: Could use LLM-Driven Delegation (transfer_to_agent) targeting a conceptual "Human Agent" that triggers the external workflow, or use the custom tool within an LlmAgent.

State/Callbacks: State can hold task details for the human; callbacks can manage the interaction flow.

Note: ADK doesn't have a built-in "Human Agent" type, so this requires custom integration.


# Conceptual Code: Using a Tool for Human Approval

from google.adk.agents import LlmAgent, SequentialAgent

from google.adk.tools import FunctionTool


# --- Assume external_approval_tool exists ---

# This tool would:

# 1. Take details (e.g., request_id, amount, reason).

# 2. Send these details to a human review system (e.g., via API).

# 3. Poll or wait for the human response (approved/rejected).

# 4. Return the human's decision.

# async def external_approval_tool(amount: float, reason: str) -> str: ...

approval_tool = FunctionTool(func=external_approval_tool)


# Agent that prepares the request

prepare_request = LlmAgent(

    name="PrepareApproval",

    instruction="Prepare the approval request details based on user input. Store amount and reason in state.",

    # ... likely sets state['approval_amount'] and state['approval_reason'] ...

)


# Agent that calls the human approval tool

request_approval = LlmAgent(

    name="RequestHumanApproval",

    instruction="Use the external_approval_tool with amount from state['approval_amount'] and reason from state['approval_reason'].",

    tools=[approval_tool],

    output_key="human_decision"

)


# Agent that proceeds based on human decision

process_decision = LlmAgent(

    name="ProcessDecision",

    instruction="Check {human_decision}. If 'approved', proceed. If 'rejected', inform user."

)


approval_workflow = SequentialAgent(

    name="HumanApprovalWorkflow",

    sub_agents=[prepare_request, request_approval, process_decision]

)





references:

https://google.github.io/adk-docs/agents/multi-agents/#c-explicit-invocation-agenttool


What is Agent Development Kit? - Part 1

 What is Agent Development Kit?


Agent Development Kit (ADK) is a flexible and modular framework for developing and deploying AI agents. While optimized for Gemini and the Google ecosystem, ADK is model-agnostic, deployment-agnostic, and is built for compatibility with other frameworks. ADK was designed to make agent development feel more like software development, to make it easier for developers to create, deploy, and orchestrate agentic architectures that range from simple tasks to complex workflows.


Flexible Orchestration


Define workflows using workflow agents (Sequential, Parallel, Loop) for predictable pipelines, or leverage LLM-driven dynamic routing (LlmAgent transfer) for adaptive behavior.


Multi-Agent Architecture

Build modular and scalable applications by composing multiple specialized agents in a hierarchy. Enable complex coordination and delegation.


Rich Tool Ecosystem

Equip agents with diverse capabilities: use pre-built tools (Search, Code Exec), create custom functions, integrate 3rd-party libraries (LangChain, CrewAI), or even use other agents as tools.


Deployment Ready

Containerize and deploy your agents anywhere – run locally, scale with Vertex AI Agent Engine, or integrate into custom infrastructure using Cloud Run or Docker.


Built-in Evaluation

Systematically assess agent performance by evaluating both the final response quality and the step-by-step execution trajectory against predefined test cases.


Building Safe and Secure Agents

Learn how to building powerful and trustworthy agents by implementing security and safety patterns and best practices into your agent's design.



What does Google ADK Include?

If you want external-facing conversational agents that can integrate with human support teams and existing telephony and communication platforms, choose the Customer Engagement Suite and its Conversational Agents.

If you want internal search to accelerate knowledge exchange throughout your organization, across your drives, chat, mail, ticketing platforms, databases, and more, including AI assistant support, choose Agentspace.

If you want to build something custom, you can use basic building blocks and start building from the ground up, for

example with the Google Gen AI SDK or LangChain, where you will also need to make decisions about infrastructure and hosting.

If you want the freedom of custom development with support for communication between agents through conversation history and shared state.

The Agent Development Kit makes it easier to build multi-agent systems, while handling challenges of agent communication for you.

It also frees you from infrastructure decisions through deployment to Agent Engine, a fully managed runtime, so you can focus on building logic and interactions between agents while resources are allocated and autoscaled for you.

The Google Agent Development Kit (or Google ADK) is designed to empower developers to build, manage, evaluate and deploy AI-powered agents.

The Agent Development Kit is a client-side Python SDK, enabling developers to quickly build and customize multi-agent systems.

While providing core tools, it allows developers to easily integrate and reuse tools from other popular agent frameworks (like LangChain, and CrewAI), leveraging existing investments and community contributions.

ADK also makes evaluation easier.

And provides a convenient, user-friendly local development user interface, with tools to help debug your agents and multi-agent systems.

Google ADK provides callbacks that can be used to invoke functions during various stages of a flow.

As well as session memory for stateful conversations, which enables agents to recall information about a user across multiple sessions, providing long-term context (in addition to short-term session State)

It integrates artifact storage to facilitate agent collaboration on documents.

And, Google ADK can be deployed to Agent Engine, for fully-managed agent infrastructure.

Google ADK is built around a few key primitives and concepts that make it powerful and flexible: The Agent is the fundamental worker unit designed for specific tasks.

Agents can use language models for complex reasoning, or to act as controllers to manage workflows.

Agents can coordinate complex tasks, delegate sub-tasks using LLM-driven transfer, or explicit Agent Tool invocation, enabling modular and scalable solutions.

With native streaming support, you can build real-time, interactive experiences with native support for bi-directional streaming, with text and audio.

This integrates seamlessly with underlying capabilities like the Gemini Live API, often enabled with simple configuration changes.

Artifact Management allows agents to save, load, and manage versioned artifacts, files or binary data, like images, documents, or generated reports, associated with a session or user, during their execution.

Google ADK provides a rich tool ecosystem, which equips agents with diverse capabilities.

It supports integrating custom functions, using other agents as tools, leveraging built-in functionalities like code execution, and interacting with external data sources and APIs.


Support for long-running tools allows handling asynchronous operations effectively.


There is also integrated developer tooling, so that you can develop and iterate locally with ease.


Google ADK includes tools like a command-line interface (CLI) and a Web UI for running agents, inspecting execution steps, debugging interactions, and visualizing agent definitions.


Session Management for session and state handles the context of a single conversation (the Session), including its history (as Events) and the agent’s working memory for that conversation (the State).


An Event is the basic unit of communication representing things that happen during a session (such as user message, agent reply, and tool use), forming the conversation history.


And Memory enables agents to recall information about a user across multiple sessions, providing long-term context, this is distinct from short-term session State.


Google ADK provides flexible orchestration that enables you to define complex agent workflows using built-in workflow agents alongside LLM-driven dynamic routing.


This allows for both predictable pipelines and adaptive agent behavior.


As part of this orchestration Google ADK uses a Runner, which is the engine that manages the execution flow, orchestrates agent interactions based on Events, and coordinates with backend services.


Google ADK has built-in Agent evaluation, which means you can assess agent performance systematically.


The framework includes tools to create multi-turn evaluation datasets, and run evaluations locally, through the CLI or UI, to measure quality and guide improvements.


And Code Execution provides the ability for agents (usually using Tools) to generate and execute code, to perform complex calculations or actions.


Callbacks are custom code snippets you provide to run at specific points in the agent's process, allowing for checks, logging, or behavior modifications.


Google ADK deploys to Agent Engine, a fully managed Google Cloud service enabling developers to deploy, manage, and scale AI agents in production.


Agent Engine handles the infrastructure to scale agents in production, so you can focus on creating intelligent and impactful applications.


And Planning, is an advanced capability where agents can break down complex goals into smaller steps and plan how to achieve them like a ReACT planner.


As part of the interactive developer tooling, Google ADK provides you tools to help debug your agents, interactions and multi-agent systems.


Your application traces will be collected by Cloud Trace, a tracing system that collects latency data from your distributed applications and displays it in the Google Cloud console.


Cloud Trace can capture traces from applications deployed on Agent Engine, and it can help you debug


the different calls performed between your LLM agent and its tools, before returning a response to the user.


Finally, models are the underlying Large Language Models, like Gemini, or Claude, that power ADKs LLM Agents, enabling their reasoning, and language understanding abilities.


While optimized for Google’s Gemini models, the framework is designed for flexibility, allowing integration with various LLMs, potentially including open-source or fine-tuned models, through its Base LLM interface.


An agent can execute the steps of a certain workflow to accomplish a goal, and can access any required external systems and tools to do so.


There are four main components for an agent: The models are used to reason over goals, determine the plan and generate a response.


An agent can use multiple models.


Tools are used to fetch data, perform actions or transactions by calling other APIs or services.


Orchestration is the mechanism for configuring the steps required to complete a task, and the logic for processing over these steps, and accessing the required tools.


It maintains memory and state, including the approach used to plan, and any data provided or fetched, as well as the necessary tools.


And the runtime is used to execute the system when invoked after receiving a query from an end user.



Core Concepts of Agent Development Kit

Google ADK is built around a few core concepts that make it powerful and flexible:


Agent: Agents are core building blocks designed to accomplish specific tasks. They can be powered by LLMs to reason, plan, and utilize tools to achieve goals, and can even collaborate on complex projects.

Tools: Tools give agents abilities beyond conversation, letting them interact with external APIs, search information, run code, or call other services.

Session Services: Session services handle the context of a single conversation (Session), including its history (Events) and the agent's working memory for that conversation (State).

Callbacks: Custom code snippets you provide to run at specific points in the agent's process, allowing for checks, logging, or behavior modifications.

Artifact Management: Artifacts allow agents to save, load, and manage files or binary data (like images or PDFs) associated with a session or user.

Runner: The engine that manages the execution flow, orchestrates agent interactions based on Events, and coordinates with backend services.




InMemoryRunner()


The Runner is the code responsible for receiving the user's query, passing it to the appropriate agent, receiving the agent's response event and passing it back to the calling application or UI for rendering, and then triggering the following event


runner.session_service.create_session()

Sessions allow an agent to preserve state, remembering a list of items, the current status of a task, or other 'current' information. This class creates a local session service for simplicity, but in production this could be handled by a database.



types.Content() and types.Part()

Instead of a simple string, the agent is passed a Content object which can consist of multiple Parts. This allows for complex messages, including text and multimodal content to be passed to the agent in a specific order.



When you ran the agent in the dev UI, it created a session service, artifact service, and runner for you. When you write your own agents to deploy programmatically, it is recommended that you provide these components as external services rather than relying on in-memory versions.




Wednesday, July 16, 2025

What is Serverless Workflow Specification?

The Serverless Workflow Specification is an open-source, vendor-neutral standard for defining workflows in serverless environments. It provides a declarative language (a Domain-Specific Language or DSL) for describing how serverless functions and other services should be orchestrated, enabling developers to build and manage complex event-driven applications. The specification is hosted by the Cloud Native Computing Foundation (CNCF). 

Here's a more detailed explanation:

Declarative Language:

The Serverless Workflow Specification uses a DSL to define workflows, allowing developers to focus on the logic and desired outcome of the workflow rather than the underlying implementation details. 

Vendor Neutrality:

The specification aims to be platform-independent, meaning workflows defined using it can be executed on various serverless platforms. 

Event-Driven Focus:

The specification is designed for orchestrating event-driven applications, where workflows are triggered by events and can react to changes in the system. 

Common Language:

By providing a standard language, the specification aims to improve portability and interoperability of serverless workflows across different platforms and tools. 

Key Components:

DSL: The core of the specification is a domain-specific language that defines the structure and behavior of workflows. 

SDKs: Software Development Kits (SDKs) are available in various languages (e.g., Java, Go, Python) to help developers interact with and build workflows. 

Runtimes: Dedicated environments (runtimes) are available to execute the defined workflows. 

Tooling: Tools are provided to assist with the development, debugging, and management of serverless workflows. 

YAML and JSON:

The Serverless Workflow DSL can be expressed using both YAML and JSON formats. 

Use Cases:

It is used to define workflows for event-driven, distributed services, and to bridge the gap between business logic and the underlying serverless technology. 

What are the common errors for Graph building in GraphRAG?

 


Common Causes and Solutions:


LLM Failure to Generate Valid JSON/Reports:


The most common reason for create_community_reports to fail is that the LLM is unable to produce the expected JSON format for the reports. GraphRAG provides prompts to the LLM to generate these summaries in a specific structure. If the LLM's response is malformed, empty, or not in JSON, GraphRAG will consider it a failure.


Solution:


Check LLM Response Quality: If your LLM (DeepSeek in this case) is not well-tuned for instruction following or JSON output, it might struggle. You can try testing your FastAPI endpoint with a prompt designed to elicit JSON to see if DeepSeek is reliably returning it.


Adjust LLM Parameters:


temperature: A very low temperature (e.g., 0.0) can sometimes make models "stick" to undesired patterns or get stuck. Try a slightly higher value like 0.1 or 0.2 in your settings.yaml for default_chat_model to allow for a little more creativity while still being deterministic.


max_tokens: Ensure max_tokens is sufficient for a meaningful report. If reports are being cut off, they might be incomplete and unparseable. GraphRAG's default is often 4000, which should be plenty.


LLM Model Choice: While deepseek-chat is generally good, some models are better at strict JSON generation than others. If you have access to a model specifically fine-tuned for structured output or function calling, that might perform better.


LLM Context Window/Token Limits:


If the input chunks for community reports are very large, your LLM might be exceeding its context window, leading to truncated or failed responses.


Solution:


Adjust chunk_size in settings.yaml: In the indexing section of your settings.yaml, try reducing the chunk_size. This will create smaller chunks of text, which in turn lead to smaller inputs for the LLM when generating community reports.



# ...

indexing:

  chunk_size: 512 # Try reducing this from default (e.g., 1024 or 2048)

  # ... other indexing settings

# ...


Verify max_tokens on your FastAPI server: Ensure the max_tokens you pass to DeepSeek is appropriate for the model and allows for a full response.


Authentication/Connection Issues to DeepSeek:


While your FastAPI server starts up, there could still be intermittent connection or authentication issues when the actual LLM calls are made for create_community_reports. The None output could imply the call itself failed or timed out.


Solution:


FastAPI Server Logs: While GraphRAG is running, keep an eye on your FastAPI server's console. Look for any ERROR or WARNING messages, especially around the time GraphRAG tries to create community reports. You might see OpenAIError or HTTPException logs there.


DeepSeek API Key: Double-check your DEEPSEEK_API_KEY environment variable on the machine running the FastAPI server. Ensure it's correct and has access to the deepseek-chat model.


Network Connectivity: Verify that the machine running your FastAPI server has stable internet connectivity to https://api.deepseek.com.


GraphRAG Prompts:


GraphRAG uses internal prompts for these steps. Sometimes, if the prompts are too complex or the LLM is sensitive to prompt wording, it can fail.


Solution:


Update GraphRAG: Ensure you're using the latest version of GraphRAG. Newer versions often include prompt improvements and bug fixes.


Custom Prompts (Advanced): GraphRAG allows you to customize the prompts used for various steps. This is an advanced step, but if all else fails, you could try inspecting the default community_reports_extractor.py (or similar) in the GraphRAG source code to understand the expected prompt format and potentially provide your own in settings.yaml. This is generally only needed if you have highly specialized needs or are using a very unusual LLM.


Recommended Steps to Fix:


Start your FastAPI server. Keep its terminal open and visible.


Verify DEEPSEEK_API_KEY is correctly set as an environment variable in the terminal where you launch your FastAPI server.


Modify your GraphRAG settings.yaml:


Add encoding_model: cl100k_base at the top level (if you haven't already).


For default_chat_model, try setting temperature: 0.1 or 0.2.


Consider adding an indexing section if it's not there, and setting chunk_size: 512.


encoding_model: cl100k_base


models:

  default_chat_model:

    type: openai_chat

    api_base: http://localhost:8000/v1

    api_key: "sk-graphrag-llm"

    model: "deepseek-chat"

    temperature: 0.1 # <--- Try this

    max_tokens: 4000

    request_timeout: 600.0

    concurrent_requests: 5

    max_retries: 5


  default_embedding_model:

    type: openai_embedding

    api_base: http://localhost:8000/v1

    api_key: "sk-graphrag-embedding"

    model: "all-MiniLM-L6-v2"

    batch_size: 16

    batch_max_tokens: 8191

    concurrent_requests: 5

    max_retries: 5


indexing: # <--- Add this section if not present

  chunk_size: 512 # <--- Try this

  # Other indexing parameters...


# ... rest of your settings


Run the GraphRAG indexer again.

Observe the output in both the GraphRAG terminal and your FastAPI server's terminal for any new errors or clues. The FastAPI logs will tell you if the request to DeepSeek was successful or failed at the API level.


By systematically adjusting these parameters and monitoring logs, you should be able to pinpoint why create_community_reports is failing. Most often, it's the LLM not returning the expected JSON or hitting token limits.


Monday, July 14, 2025

What is Sharding in Graph databases?

Explanation of Components and Flow:

Client Application: This is your application (e.g., web server, microservice) that needs to read from or write to the database. It doesn't typically know about the individual shards directly.

Sharding Proxy / Query Router:

This is a crucial intermediary layer (sometimes built into the application, but often a separate service or a database's built-in feature, like MongoDB's mongos or ArangoDB's Coordinators).

Its main job is to abstract away the sharding complexity from the application.

It receives all database requests (reads and writes) from the client application.

Shard Key Logic:

Located within the Sharding Proxy or a closely integrated component.

This is where the sharding strategy is applied.

It determines which shard (or shards) a particular piece of data belongs to based on the shard key extracted from the query.

Configuration / Metadata Store:

A central repository that stores vital information about the sharded cluster:

Shard Mapping Rules: How the shard key values map to specific shards (e.g., "users with IDs 1-1000 go to Shard 1").

Shard Status & Location: Which shards are online, their network addresses, etc.

Data Distribution: Overall statistics on how data is distributed.

The Shard Key Logic consults this store to make routing decisions.

Shards (DB Instance 1, 2, ..., N):

These are individual, independent database instances.

Each shard holds a unique subset of the total data (a "logical shard" is the data subset, the DB instance is the "physical shard").

Each shard typically has the same schema as the original logical database.

Shards can themselves be replicated for high availability (e.g., a replica set for each shard).

Data Subset 1, 2, ..., N:

Represents the actual data stored on each corresponding shard.

Rebalancing / Admin Tools:

Tools and processes used by administrators or automated systems to manage the sharded cluster.

Rebalancing: As data grows or access patterns change, shards can become unbalanced (some "hotter" than others). Rebalancing involves moving chunks of data between shards to ensure an even distribution of load and storage.

Adding/Removing Shards: These tools facilitate scaling out or in the cluster by adding new database instances as shards or removing old ones.

Workflow for a Write Request (e.g., INSERT):

Client Application sends an INSERT query to the Sharding Proxy.

The Sharding Proxy extracts the relevant shard key value from the data to be inserted.

The Shard Key Logic consults the Configuration/Metadata Store to determine which specific Shard this data should reside on based on the shard key and mapping rules.

The Sharding Proxy routes the INSERT query directly to the identified Shard.

The Shard processes the write and stores the Data Subset on its local storage.

Workflow for a Read Request (e.g., SELECT):

Client Application sends a SELECT query to the Sharding Proxy.

The Sharding Proxy extracts the shard key (if present in the query conditions).

The Shard Key Logic determines if the query can be fulfilled by a single shard or if it needs to go to multiple shards:

Single Shard Query: If the shard key is part of the query condition (e.g., SELECT * FROM users WHERE user_id = 'XYZ'), the Shard Key Logic identifies the specific Shard where user_id 'XYZ' resides. The query is then routed directly to that shard, which returns the result. This is the most efficient type of sharded query.

Multi-Shard / Fan-out Query: If the query does not include the shard key (e.g., SELECT * FROM users WHERE city = 'New York') or requires aggregating data across multiple partitions (e.g., SELECT COUNT(*) FROM users), the Sharding Proxy might "fan out" the query to all relevant Shards. Each shard executes its portion of the query, and the Sharding Proxy collects and aggregates the results before returning them to the client. This type of query is generally less efficient due to network overhead and potential data aggregation complexity.

This diagram provides a high-level overview of how sharding works to distribute data and queries in a scalable database system.