diff --git a/tests/unit/vertexai/genai/replays/test_create_agent_engine.py b/tests/unit/vertexai/genai/replays/test_create_agent_engine.py index 16dd0c6c68..aeb6599968 100644 --- a/tests/unit/vertexai/genai/replays/test_create_agent_engine.py +++ b/tests/unit/vertexai/genai/replays/test_create_agent_engine.py @@ -90,12 +90,23 @@ def test_create_with_context_spec(client): memory_bank_customization_config = types.MemoryBankCustomizationConfig( **customization_config ) + generation_trigger_config = { + "generation_rule": { + "idle_duration": "300s", + }, + } + generation_trigger_config_obj = types.MemoryGenerationTriggerConfig( + **generation_trigger_config + ) agent_engine = client.agent_engines.create( config={ "context_spec": { "memory_bank_config": { - "generation_config": {"model": generation_model}, + "generation_config": { + "model": generation_model, + "generation_trigger_config": generation_trigger_config_obj, + }, "similarity_search_config": { "embedding_model": embedding_model, }, @@ -109,6 +120,10 @@ def test_create_with_context_spec(client): agent_engine = client.agent_engines.get(name=agent_engine.api_resource.name) memory_bank_config = agent_engine.api_resource.context_spec.memory_bank_config assert memory_bank_config.generation_config.model == generation_model + assert ( + memory_bank_config.generation_config.generation_trigger_config + == generation_trigger_config_obj + ) assert ( memory_bank_config.similarity_search_config.embedding_model == embedding_model ) diff --git a/tests/unit/vertexai/genai/replays/test_ingest_events_memory_bank.py b/tests/unit/vertexai/genai/replays/test_ingest_events_memory_bank.py new file mode 100644 index 0000000000..b4a4ccb820 --- /dev/null +++ b/tests/unit/vertexai/genai/replays/test_ingest_events_memory_bank.py @@ -0,0 +1,91 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pylint: disable=protected-access,bad-continuation,missing-function-docstring + +from tests.unit.vertexai.genai.replays import pytest_helper + + +def test_ingest_events(client): + agent_engine = client.agent_engines.create() + assert not list( + client.agent_engines.memories.list( + name=agent_engine.api_resource.name, + ) + ) + scope = {"user_id": "test-user-id"} + # Generate memories using source content. This result is non-deterministic, + # because an LLM is used to generate the memories. + client.agent_engines.memories.ingest_events( + name=agent_engine.api_resource.name, + scope=scope, + direct_contents_source={ + "events": [ + { + "content": { + "role": "user", + "parts": [{"text": "I like dogs."}], + } + } + ] + }, + generation_trigger_config={"generation_rule": {"idle_duration": "60s"}}, + ) + memories = list( + client.agent_engines.memories.retrieve( + name=agent_engine.api_resource.name, + scope=scope, + ) + ) + # Ingest events should be asynchronous by default, so there should be no + # memories immediately after the call. Processing will only start after 60s + # of inactivity. + assert len(memories) == 0 + + client.agent_engines.memories.ingest_events( + name=agent_engine.api_resource.name, + scope=scope, + direct_contents_source={ + "events": [ + { + "content": { + "role": "user", + "parts": [{"text": "I'm a software engineer."}], + } + } + ] + }, + config={"wait_for_completion": True, "force_flush": True}, + ) + memories = list( + client.agent_engines.memories.retrieve( + name=agent_engine.api_resource.name, + scope=scope, + simple_retrieval_params={ + "page_size": 1, + }, + ) + ) + # With `wait_for_completion` and `force_flush` set to True, there should be + # memories immediately after the call. + assert len(memories) >= 1 + + client.agent_engines.delete(name=agent_engine.api_resource.name, force=True) + + +pytestmark = pytest_helper.setup( + file=__file__, + globals_for_file=globals(), + test_method="agent_engines.memories.ingest_events", +) diff --git a/vertexai/_genai/memories.py b/vertexai/_genai/memories.py index c105cf760e..f39a000160 100644 --- a/vertexai/_genai/memories.py +++ b/vertexai/_genai/memories.py @@ -261,6 +261,52 @@ def _GetAgentEngineMemoryRequestParameters_to_vertex( return to_object +def _IngestEventsConfig_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + + if getv(from_object, ["force_flush"]) is not None: + setv(parent_object, ["forceFlush"], getv(from_object, ["force_flush"])) + + return to_object + + +def _IngestEventsRequestParameters_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["name"]) is not None: + setv(to_object, ["_url", "name"], getv(from_object, ["name"])) + + if getv(from_object, ["stream_id"]) is not None: + setv(to_object, ["streamId"], getv(from_object, ["stream_id"])) + + if getv(from_object, ["direct_contents_source"]) is not None: + setv( + to_object, + ["directContentsSource"], + getv(from_object, ["direct_contents_source"]), + ) + + if getv(from_object, ["scope"]) is not None: + setv(to_object, ["scope"], getv(from_object, ["scope"])) + + if getv(from_object, ["generation_trigger_config"]) is not None: + setv( + to_object, + ["generationTriggerConfig"], + getv(from_object, ["generation_trigger_config"]), + ) + + if getv(from_object, ["config"]) is not None: + _IngestEventsConfig_to_vertex(getv(from_object, ["config"]), to_object) + + return to_object + + def _ListAgentEngineMemoryConfig_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, @@ -805,6 +851,88 @@ def get( self._api_client._verify_response(return_value) return return_value + def _ingest_events( + self, + *, + name: str, + stream_id: Optional[str] = None, + direct_contents_source: Optional[ + types.IngestionDirectContentsSourceOrDict + ] = None, + scope: Optional[dict[str, str]] = None, + generation_trigger_config: Optional[ + types.MemoryGenerationTriggerConfigOrDict + ] = None, + config: Optional[types.IngestEventsConfigOrDict] = None, + ) -> types.MemoryBankIngestEventsOperation: + """ + Ingest events into a Memory Bank. + """ + + parameter_model = types._IngestEventsRequestParameters( + name=name, + stream_id=stream_id, + direct_contents_source=direct_contents_source, + scope=scope, + generation_trigger_config=generation_trigger_config, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError("This method is only supported in the Vertex AI client.") + else: + request_dict = _IngestEventsRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}/memories:ingestEvents".format_map(request_url_dict) + else: + path = "{name}/memories:ingestEvents" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = self._api_client.request("post", path, request_dict, http_options) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.MemoryBankIngestEventsOperation._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + def _list( self, *, @@ -1727,6 +1855,89 @@ def purge( raise RuntimeError(f"Failed to purge memories: {operation.error}") return operation + def ingest_events( + self, + *, + name: str, + scope: dict[str, str], + stream_id: str = "", + direct_contents_source: Optional[ + types.IngestionDirectContentsSourceOrDict + ] = None, + generation_trigger_config: Optional[ + types.MemoryGenerationTriggerConfigOrDict + ] = None, + config: Optional[types.IngestEventsConfigOrDict] = None, + ) -> types.MemoryBankIngestEventsOperation: + """Ingests events into an Agent Engine. + + Example usage: + ``` + client.agent_engines.memories.ingest_events( + name="projects/test-project/locations/us-central1/reasoningEngines/test-agent-engine", + scope={"user_id": "test-user-id"}, + direct_contents_source={ + "events": [ + { + "content": { + "role": "user", + "parts": [ + {"text": "I am a software engineer."} + ], + } + } + ] + }, + generation_trigger_config={ + "generation_rule": { + "idle_duration": "60s" + } + } + ) + ``` + + Args: + name (str): + Required. The name of the Agent Engine to ingest events into. + scope (dict[str, str]): + Required. The scope of the events to ingest. For example, + {"user_id": "123"}. + stream_id (str): + Optional. The ID of the stream to ingest events into. If not + specified, the events will be ingested into the default stream. + direct_contents_source (IngestionDirectContentsSource): + The direct contents source, containing the events to ingest. + generation_trigger_config (MemoryGenerationTriggerConfig): + Optional. The configuration for the generation trigger config. + config (IngestEventsConfig): + Optional. The configuration for the ingest events operation. + + Returns: + AgentEngineIngestEventsOperation: + The operation for ingesting the events. + """ + if config is None: + config = types.IngestEventsConfig() + elif isinstance(config, dict): + config = types.IngestEventsConfig.model_validate(config) + operation = self._ingest_events( + name=name, + scope=scope, + stream_id=stream_id, + generation_trigger_config=generation_trigger_config, + direct_contents_source=direct_contents_source, + config=config, + ) + if config.wait_for_completion and not operation.done: + operation = _agent_engines_utils._await_operation( + operation_name=operation.name, + get_operation_fn=self._get_memory_operation, + poll_interval_seconds=0.5, + ) + if operation.error: + raise RuntimeError(f"Failed to ingest events: {operation.error}") + return operation + class AsyncMemories(_api_module.BaseModule): @@ -2058,6 +2269,90 @@ async def get( self._api_client._verify_response(return_value) return return_value + async def _ingest_events( + self, + *, + name: str, + stream_id: Optional[str] = None, + direct_contents_source: Optional[ + types.IngestionDirectContentsSourceOrDict + ] = None, + scope: Optional[dict[str, str]] = None, + generation_trigger_config: Optional[ + types.MemoryGenerationTriggerConfigOrDict + ] = None, + config: Optional[types.IngestEventsConfigOrDict] = None, + ) -> types.MemoryBankIngestEventsOperation: + """ + Ingest events into a Memory Bank. + """ + + parameter_model = types._IngestEventsRequestParameters( + name=name, + stream_id=stream_id, + direct_contents_source=direct_contents_source, + scope=scope, + generation_trigger_config=generation_trigger_config, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError("This method is only supported in the Vertex AI client.") + else: + request_dict = _IngestEventsRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}/memories:ingestEvents".format_map(request_url_dict) + else: + path = "{name}/memories:ingestEvents" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = await self._api_client.async_request( + "post", path, request_dict, http_options + ) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.MemoryBankIngestEventsOperation._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + async def _list( self, *, @@ -2995,3 +3290,86 @@ async def purge( if operation.error: raise RuntimeError(f"Failed to purge memories: {operation.error}") return operation + + async def ingest_events( + self, + *, + name: str, + scope: dict[str, str], + stream_id: str = "", + direct_contents_source: Optional[ + types.IngestionDirectContentsSourceOrDict + ] = None, + generation_trigger_config: Optional[ + types.MemoryGenerationTriggerConfigOrDict + ] = None, + config: Optional[types.IngestEventsConfigOrDict] = None, + ) -> types.MemoryBankIngestEventsOperation: + """Ingests events into an Agent Engine. + + Example usage: + ``` + await client.aio.agent_engines.memories.ingest_events( + name="projects/test-project/locations/us-central1/reasoningEngines/test-agent-engine", + scope={"user_id": "test-user-id"}, + direct_contents_source={ + "events": [ + { + "content": { + "role": "user", + "parts": [ + {"text": "I am a software engineer."} + ], + } + } + ] + }, + generation_trigger_config={ + "generation_rule": { + "idle_duration": "60s" + } + } + ) + ``` + + Args: + name (str): + Required. The name of the Agent Engine to ingest events into. + scope (dict[str, str]): + Required. The scope of the events to ingest. For example, + {"user_id": "123"}. + stream_id (str): + Optional. The ID of the stream to ingest events into. If not + specified, the events will be ingested into the default stream. + direct_contents_source (IngestionDirectContentsSource): + The direct contents source, containing the events to ingest. + generation_trigger_config (MemoryGenerationTriggerConfig): + Optional. The configuration for the generation trigger config. + config (IngestEventsConfig): + Optional. The configuration for the ingest events operation. + + Returns: + AgentEngineIngestEventsOperation: + The operation for ingesting the events. + """ + if config is None: + config = types.IngestEventsConfig() + elif isinstance(config, dict): + config = types.IngestEventsConfig.model_validate(config) + operation = await self._ingest_events( + name=name, + scope=scope, + stream_id=stream_id, + generation_trigger_config=generation_trigger_config, + direct_contents_source=direct_contents_source, + config=config, + ) + if config.wait_for_completion and not operation.done: + operation = await _agent_engines_utils._await_async_operation( + operation_name=operation.name, + get_operation_fn=self._get_memory_operation, + poll_interval_seconds=0.5, + ) + if operation.error: + raise RuntimeError(f"Failed to ingest events: {operation.error}") + return operation diff --git a/vertexai/_genai/types/__init__.py b/vertexai/_genai/types/__init__.py index 768abef4b5..8eecec97fa 100644 --- a/vertexai/_genai/types/__init__.py +++ b/vertexai/_genai/types/__init__.py @@ -78,6 +78,7 @@ from .common import _GetEvaluationSetParameters from .common import _GetMultimodalDatasetOperationParameters from .common import _GetMultimodalDatasetParameters +from .common import _IngestEventsRequestParameters from .common import _ListAgentEngineMemoryRequestParameters from .common import _ListAgentEngineMemoryRevisionsRequestParameters from .common import _ListAgentEngineRequestParameters @@ -544,6 +545,15 @@ from .common import GetPromptConfigOrDict from .common import IdentityType from .common import Importance +from .common import IngestEventsConfig +from .common import IngestEventsConfigDict +from .common import IngestEventsConfigOrDict +from .common import IngestionDirectContentsSource +from .common import IngestionDirectContentsSourceDict +from .common import IngestionDirectContentsSourceEvent +from .common import IngestionDirectContentsSourceEventDict +from .common import IngestionDirectContentsSourceEventOrDict +from .common import IngestionDirectContentsSourceOrDict from .common import IntermediateExtractedMemory from .common import IntermediateExtractedMemoryDict from .common import IntermediateExtractedMemoryOrDict @@ -695,6 +705,9 @@ from .common import MemoryBankCustomizationConfigMemoryTopicManagedMemoryTopicOrDict from .common import MemoryBankCustomizationConfigMemoryTopicOrDict from .common import MemoryBankCustomizationConfigOrDict +from .common import MemoryBankIngestEventsOperation +from .common import MemoryBankIngestEventsOperationDict +from .common import MemoryBankIngestEventsOperationOrDict from .common import MemoryConjunctionFilter from .common import MemoryConjunctionFilterDict from .common import MemoryConjunctionFilterOrDict @@ -702,6 +715,12 @@ from .common import MemoryFilter from .common import MemoryFilterDict from .common import MemoryFilterOrDict +from .common import MemoryGenerationTriggerConfig +from .common import MemoryGenerationTriggerConfigDict +from .common import MemoryGenerationTriggerConfigGenerationTriggerRule +from .common import MemoryGenerationTriggerConfigGenerationTriggerRuleDict +from .common import MemoryGenerationTriggerConfigGenerationTriggerRuleOrDict +from .common import MemoryGenerationTriggerConfigOrDict from .common import MemoryMetadataMergeStrategy from .common import MemoryMetadataValue from .common import MemoryMetadataValueDict @@ -1720,6 +1739,12 @@ "MemoryBankCustomizationConfig", "MemoryBankCustomizationConfigDict", "MemoryBankCustomizationConfigOrDict", + "MemoryGenerationTriggerConfigGenerationTriggerRule", + "MemoryGenerationTriggerConfigGenerationTriggerRuleDict", + "MemoryGenerationTriggerConfigGenerationTriggerRuleOrDict", + "MemoryGenerationTriggerConfig", + "MemoryGenerationTriggerConfigDict", + "MemoryGenerationTriggerConfigOrDict", "ReasoningEngineContextSpecMemoryBankConfigGenerationConfig", "ReasoningEngineContextSpecMemoryBankConfigGenerationConfigDict", "ReasoningEngineContextSpecMemoryBankConfigGenerationConfigOrDict", @@ -1900,6 +1925,18 @@ "GetAgentEngineMemoryConfig", "GetAgentEngineMemoryConfigDict", "GetAgentEngineMemoryConfigOrDict", + "IngestionDirectContentsSourceEvent", + "IngestionDirectContentsSourceEventDict", + "IngestionDirectContentsSourceEventOrDict", + "IngestionDirectContentsSource", + "IngestionDirectContentsSourceDict", + "IngestionDirectContentsSourceOrDict", + "IngestEventsConfig", + "IngestEventsConfigDict", + "IngestEventsConfigOrDict", + "MemoryBankIngestEventsOperation", + "MemoryBankIngestEventsOperationDict", + "MemoryBankIngestEventsOperationOrDict", "ListAgentEngineMemoryConfig", "ListAgentEngineMemoryConfigDict", "ListAgentEngineMemoryConfigOrDict", @@ -2394,6 +2431,7 @@ "_DeleteAgentEngineMemoryRequestParameters", "_GenerateAgentEngineMemoriesRequestParameters", "_GetAgentEngineMemoryRequestParameters", + "_IngestEventsRequestParameters", "_ListAgentEngineMemoryRequestParameters", "_GetAgentEngineMemoryOperationParameters", "_GetAgentEngineGenerateMemoriesOperationParameters", diff --git a/vertexai/_genai/types/common.py b/vertexai/_genai/types/common.py index c3ef22d4a1..fd771b8cb4 100644 --- a/vertexai/_genai/types/common.py +++ b/vertexai/_genai/types/common.py @@ -6828,6 +6828,65 @@ class MemoryBankCustomizationConfigDict(TypedDict, total=False): ] +class MemoryGenerationTriggerConfigGenerationTriggerRule(_common.BaseModel): + """Represents the active rule that determines when to flush the buffer.""" + + event_count: Optional[int] = Field( + default=None, + description="""Specifies to trigger generation when the event count reaches this limit.""", + ) + fixed_interval: Optional[str] = Field( + default=None, + description="""Specifies to trigger generation at a fixed interval. The duration must have a minute-level granularity.""", + ) + idle_duration: Optional[str] = Field( + default=None, + description="""Specifies to trigger generation if the stream is inactive for the specified duration after the most recent event. The duration must have a minute-level granularity.""", + ) + + +class MemoryGenerationTriggerConfigGenerationTriggerRuleDict(TypedDict, total=False): + """Represents the active rule that determines when to flush the buffer.""" + + event_count: Optional[int] + """Specifies to trigger generation when the event count reaches this limit.""" + + fixed_interval: Optional[str] + """Specifies to trigger generation at a fixed interval. The duration must have a minute-level granularity.""" + + idle_duration: Optional[str] + """Specifies to trigger generation if the stream is inactive for the specified duration after the most recent event. The duration must have a minute-level granularity.""" + + +MemoryGenerationTriggerConfigGenerationTriggerRuleOrDict = Union[ + MemoryGenerationTriggerConfigGenerationTriggerRule, + MemoryGenerationTriggerConfigGenerationTriggerRuleDict, +] + + +class MemoryGenerationTriggerConfig(_common.BaseModel): + """The configuration for triggering memory generation for ingested events.""" + + generation_rule: Optional[MemoryGenerationTriggerConfigGenerationTriggerRule] = ( + Field( + default=None, + description="""Optional. Represents the active rule that determines when to flush the buffer. If not set, then the stream will be force flushed immediately.""", + ) + ) + + +class MemoryGenerationTriggerConfigDict(TypedDict, total=False): + """The configuration for triggering memory generation for ingested events.""" + + generation_rule: Optional[MemoryGenerationTriggerConfigGenerationTriggerRuleDict] + """Optional. Represents the active rule that determines when to flush the buffer. If not set, then the stream will be force flushed immediately.""" + + +MemoryGenerationTriggerConfigOrDict = Union[ + MemoryGenerationTriggerConfig, MemoryGenerationTriggerConfigDict +] + + class ReasoningEngineContextSpecMemoryBankConfigGenerationConfig(_common.BaseModel): """Configuration for how to generate memories.""" @@ -6835,6 +6894,10 @@ class ReasoningEngineContextSpecMemoryBankConfigGenerationConfig(_common.BaseMod default=None, description="""Optional. The model used to generate memories. Format: `projects/{project}/locations/{location}/publishers/google/models/{model}`.""", ) + generation_trigger_config: Optional[MemoryGenerationTriggerConfig] = Field( + default=None, + description="""Optional. Specifies the default trigger configuration for generating memories using `IngestEvents`.""", + ) class ReasoningEngineContextSpecMemoryBankConfigGenerationConfigDict( @@ -6845,6 +6908,9 @@ class ReasoningEngineContextSpecMemoryBankConfigGenerationConfigDict( model: Optional[str] """Optional. The model used to generate memories. Format: `projects/{project}/locations/{location}/publishers/google/models/{model}`.""" + generation_trigger_config: Optional[MemoryGenerationTriggerConfigDict] + """Optional. Specifies the default trigger configuration for generating memories using `IngestEvents`.""" + ReasoningEngineContextSpecMemoryBankConfigGenerationConfigOrDict = Union[ ReasoningEngineContextSpecMemoryBankConfigGenerationConfig, @@ -9952,6 +10018,193 @@ class _GetAgentEngineMemoryRequestParametersDict(TypedDict, total=False): ] +class IngestionDirectContentsSourceEvent(_common.BaseModel): + """The direct contents source event for ingesting events.""" + + content: Optional[genai_types.Content] = Field( + default=None, description="""Required. The content of the event.""" + ) + event_id: Optional[str] = Field( + default=None, + description="""Optional. A unique identifier for the event. If an event with the same event_id is ingested multiple times, it will be de-duplicated.""", + ) + event_time: Optional[datetime.datetime] = Field( + default=None, + description="""Optional. The time at which the event occurred. If provided, this timestamp will be used for ordering events within a stream. If not provided, the server-side ingestion time will be used.""", + ) + + +class IngestionDirectContentsSourceEventDict(TypedDict, total=False): + """The direct contents source event for ingesting events.""" + + content: Optional[genai_types.ContentDict] + """Required. The content of the event.""" + + event_id: Optional[str] + """Optional. A unique identifier for the event. If an event with the same event_id is ingested multiple times, it will be de-duplicated.""" + + event_time: Optional[datetime.datetime] + """Optional. The time at which the event occurred. If provided, this timestamp will be used for ordering events within a stream. If not provided, the server-side ingestion time will be used.""" + + +IngestionDirectContentsSourceEventOrDict = Union[ + IngestionDirectContentsSourceEvent, IngestionDirectContentsSourceEventDict +] + + +class IngestionDirectContentsSource(_common.BaseModel): + """The direct contents source for ingesting events.""" + + events: Optional[list[IngestionDirectContentsSourceEvent]] = Field( + default=None, description="""Required. The events to ingest.""" + ) + + +class IngestionDirectContentsSourceDict(TypedDict, total=False): + """The direct contents source for ingesting events.""" + + events: Optional[list[IngestionDirectContentsSourceEventDict]] + """Required. The events to ingest.""" + + +IngestionDirectContentsSourceOrDict = Union[ + IngestionDirectContentsSource, IngestionDirectContentsSourceDict +] + + +class IngestEventsConfig(_common.BaseModel): + """Config for ingesting events.""" + + http_options: Optional[genai_types.HttpOptions] = Field( + default=None, description="""Used to override HTTP request options.""" + ) + wait_for_completion: Optional[bool] = Field( + default=False, + description="""Waits for the underlying memory generation operation to complete + before returning. Defaults to false.""", + ) + force_flush: Optional[bool] = Field( + default=None, + description="""Optional. Forces a flush of all pending events in the stream and triggers memory generation immediately bypassing any conditions configured in the `generation_trigger_config`.""", + ) + + +class IngestEventsConfigDict(TypedDict, total=False): + """Config for ingesting events.""" + + http_options: Optional[genai_types.HttpOptionsDict] + """Used to override HTTP request options.""" + + wait_for_completion: Optional[bool] + """Waits for the underlying memory generation operation to complete + before returning. Defaults to false.""" + + force_flush: Optional[bool] + """Optional. Forces a flush of all pending events in the stream and triggers memory generation immediately bypassing any conditions configured in the `generation_trigger_config`.""" + + +IngestEventsConfigOrDict = Union[IngestEventsConfig, IngestEventsConfigDict] + + +class _IngestEventsRequestParameters(_common.BaseModel): + """Parameters for purging agent engine memories.""" + + name: Optional[str] = Field( + default=None, description="""Name of the Agent Engine to ingest events into.""" + ) + stream_id: Optional[str] = Field( + default=None, description="""The ID of the stream to ingest events into.""" + ) + direct_contents_source: Optional[IngestionDirectContentsSource] = Field( + default=None, + description="""The direct memories source of the events that should be ingested.""", + ) + scope: Optional[dict[str, str]] = Field( + default=None, + description="""The scope of the memories that should be generated from the stream. + + Memories will be consolidated across memories with the same scope. Scope + values cannot contain the wildcard character '*'.""", + ) + generation_trigger_config: Optional[MemoryGenerationTriggerConfig] = Field( + default=None, + description="""The configuration for the memory generation trigger.""", + ) + config: Optional[IngestEventsConfig] = Field(default=None, description="""""") + + +class _IngestEventsRequestParametersDict(TypedDict, total=False): + """Parameters for purging agent engine memories.""" + + name: Optional[str] + """Name of the Agent Engine to ingest events into.""" + + stream_id: Optional[str] + """The ID of the stream to ingest events into.""" + + direct_contents_source: Optional[IngestionDirectContentsSourceDict] + """The direct memories source of the events that should be ingested.""" + + scope: Optional[dict[str, str]] + """The scope of the memories that should be generated from the stream. + + Memories will be consolidated across memories with the same scope. Scope + values cannot contain the wildcard character '*'.""" + + generation_trigger_config: Optional[MemoryGenerationTriggerConfigDict] + """The configuration for the memory generation trigger.""" + + config: Optional[IngestEventsConfigDict] + """""" + + +_IngestEventsRequestParametersOrDict = Union[ + _IngestEventsRequestParameters, _IngestEventsRequestParametersDict +] + + +class MemoryBankIngestEventsOperation(_common.BaseModel): + """Operation that ingests events into a memory bank.""" + + name: Optional[str] = Field( + default=None, + description="""The server-assigned name, which is only unique within the same service that originally returns it. If you use the default HTTP mapping, the `name` should be a resource name ending with `operations/{unique_id}`.""", + ) + metadata: Optional[dict[str, Any]] = Field( + default=None, + description="""Service-specific metadata associated with the operation. It typically contains progress information and common metadata such as create time. Some services might not provide such metadata. Any method that returns a long-running operation should document the metadata type, if any.""", + ) + done: Optional[bool] = Field( + default=None, + description="""If the value is `false`, it means the operation is still in progress. If `true`, the operation is completed, and either `error` or `response` is available.""", + ) + error: Optional[dict[str, Any]] = Field( + default=None, + description="""The error result of the operation in case of failure or cancellation.""", + ) + + +class MemoryBankIngestEventsOperationDict(TypedDict, total=False): + """Operation that ingests events into a memory bank.""" + + name: Optional[str] + """The server-assigned name, which is only unique within the same service that originally returns it. If you use the default HTTP mapping, the `name` should be a resource name ending with `operations/{unique_id}`.""" + + metadata: Optional[dict[str, Any]] + """Service-specific metadata associated with the operation. It typically contains progress information and common metadata such as create time. Some services might not provide such metadata. Any method that returns a long-running operation should document the metadata type, if any.""" + + done: Optional[bool] + """If the value is `false`, it means the operation is still in progress. If `true`, the operation is completed, and either `error` or `response` is available.""" + + error: Optional[dict[str, Any]] + """The error result of the operation in case of failure or cancellation.""" + + +MemoryBankIngestEventsOperationOrDict = Union[ + MemoryBankIngestEventsOperation, MemoryBankIngestEventsOperationDict +] + + class ListAgentEngineMemoryConfig(_common.BaseModel): """Config for listing agent engine memories."""