Since Bedrock supports a variety of date-stamped models, we explicitly list the latest models but allow any name in the type hints.
See the Bedrock docs for a full list.
ALL FIELDS MUST BE bedrock_ PREFIXED SO YOU CAN MERGE THEM WITH OTHER MODELS.
Source code in pydantic_ai_slim/pydantic_ai/models/bedrock.py
115116117118119
classBedrockModelSettings(ModelSettings):"""Settings for Bedrock models. ALL FIELDS MUST BE `bedrock_` PREFIXED SO YOU CAN MERGE THEM WITH OTHER MODELS. """
@dataclass(init=False)classBedrockConverseModel(Model):"""A model that uses the Bedrock Converse API."""client:BedrockRuntimeClient_model_name:BedrockModelName=field(repr=False)_system:str=field(default='bedrock',repr=False)@propertydefmodel_name(self)->str:"""The model name."""returnself._model_name@propertydefsystem(self)->str:"""The system / model provider, ex: openai."""returnself._systemdef__init__(self,model_name:BedrockModelName,*,provider:Literal['bedrock']|Provider[BaseClient]='bedrock',):"""Initialize a Bedrock model. Args: model_name: The name of the model to use. model_name: The name of the Bedrock model to use. List of model names available [here](https://docs.aws.amazon.com/bedrock/latest/userguide/models-supported.html). provider: The provider to use for authentication and API access. Can be either the string 'bedrock' or an instance of `Provider[BaseClient]`. If not provided, a new provider will be created using the other parameters. """self._model_name=model_nameifisinstance(provider,str):provider=infer_provider(provider)self.client=cast('BedrockRuntimeClient',provider.client)def_get_tools(self,model_request_parameters:ModelRequestParameters)->list[ToolTypeDef]:tools=[self._map_tool_definition(r)forrinmodel_request_parameters.function_tools]ifmodel_request_parameters.result_tools:tools+=[self._map_tool_definition(r)forrinmodel_request_parameters.result_tools]returntools@staticmethoddef_map_tool_definition(f:ToolDefinition)->ToolTypeDef:return{'toolSpec':{'name':f.name,'description':f.description,'inputSchema':{'json':f.parameters_json_schema},}}@propertydefbase_url(self)->str:returnstr(self.client.meta.endpoint_url)asyncdefrequest(self,messages:list[ModelMessage],model_settings:ModelSettings|None,model_request_parameters:ModelRequestParameters,)->tuple[ModelResponse,result.Usage]:response=awaitself._messages_create(messages,False,model_settings,model_request_parameters)returnawaitself._process_response(response)@asynccontextmanagerasyncdefrequest_stream(self,messages:list[ModelMessage],model_settings:ModelSettings|None,model_request_parameters:ModelRequestParameters,)->AsyncIterator[StreamedResponse]:response=awaitself._messages_create(messages,True,model_settings,model_request_parameters)yieldBedrockStreamedResponse(_model_name=self.model_name,_event_stream=response)asyncdef_process_response(self,response:ConverseResponseTypeDef)->tuple[ModelResponse,result.Usage]:items:list[ModelResponsePart]=[]ifmessage:=response['output'].get('message'):foriteminmessage['content']:iftext:=item.get('text'):items.append(TextPart(content=text))else:tool_use=item.get('toolUse')asserttool_useisnotNone,f'Found a content that is not a text or tool use: {item}'items.append(ToolCallPart(tool_name=tool_use['name'],args=tool_use['input'],tool_call_id=tool_use['toolUseId'],),)usage=result.Usage(request_tokens=response['usage']['inputTokens'],response_tokens=response['usage']['outputTokens'],total_tokens=response['usage']['totalTokens'],)returnModelResponse(items,model_name=self.model_name),usage@overloadasyncdef_messages_create(self,messages:list[ModelMessage],stream:Literal[True],model_settings:ModelSettings|None,model_request_parameters:ModelRequestParameters,)->EventStream[ConverseStreamOutputTypeDef]:pass@overloadasyncdef_messages_create(self,messages:list[ModelMessage],stream:Literal[False],model_settings:ModelSettings|None,model_request_parameters:ModelRequestParameters,)->ConverseResponseTypeDef:passasyncdef_messages_create(self,messages:list[ModelMessage],stream:bool,model_settings:ModelSettings|None,model_request_parameters:ModelRequestParameters,)->ConverseResponseTypeDef|EventStream[ConverseStreamOutputTypeDef]:tools=self._get_tools(model_request_parameters)support_tools_choice=self.model_name.startswith(('anthropic','us.anthropic'))ifnottoolsornotsupport_tools_choice:tool_choice:ToolChoiceTypeDef={}elifnotmodel_request_parameters.allow_text_result:tool_choice={'any':{}}else:tool_choice={'auto':{}}system_prompt,bedrock_messages=awaitself._map_message(messages)inference_config=self._map_inference_config(model_settings)params={'modelId':self.model_name,'messages':bedrock_messages,'system':[{'text':system_prompt}],'inferenceConfig':inference_config,**({'toolConfig':{'tools':tools,**({'toolChoice':tool_choice}iftool_choiceelse{})}}iftoolselse{}),}ifstream:model_response=awaitanyio.to_thread.run_sync(functools.partial(self.client.converse_stream,**params))model_response=model_response['stream']else:model_response=awaitanyio.to_thread.run_sync(functools.partial(self.client.converse,**params))returnmodel_response@staticmethoddef_map_inference_config(model_settings:ModelSettings|None,)->InferenceConfigurationTypeDef:model_settings=model_settingsor{}inference_config:InferenceConfigurationTypeDef={}ifmax_tokens:=model_settings.get('max_tokens'):inference_config['maxTokens']=max_tokensiftemperature:=model_settings.get('temperature'):inference_config['temperature']=temperatureiftop_p:=model_settings.get('top_p'):inference_config['topP']=top_p# TODO(Marcelo): This is not included in model_settings yet.# if stop_sequences := model_settings.get('stop_sequences'):# inference_config['stopSequences'] = stop_sequencesreturninference_configasyncdef_map_message(self,messages:list[ModelMessage])->tuple[str,list[MessageUnionTypeDef]]:"""Just maps a `pydantic_ai.Message` to the Bedrock `MessageUnionTypeDef`."""system_prompt:str=''bedrock_messages:list[MessageUnionTypeDef]=[]forminmessages:ifisinstance(m,ModelRequest):forpartinm.parts:ifisinstance(part,SystemPromptPart):system_prompt+=part.contentelifisinstance(part,UserPromptPart):bedrock_messages.extend(awaitself._map_user_prompt(part))elifisinstance(part,ToolReturnPart):assertpart.tool_call_idisnotNonebedrock_messages.append({'role':'user','content':[{'toolResult':{'toolUseId':part.tool_call_id,'content':[{'text':part.model_response_str()}],'status':'success',}}],})elifisinstance(part,RetryPromptPart):# TODO(Marcelo): We need to add a test here.ifpart.tool_nameisNone:# pragma: no coverbedrock_messages.append({'role':'user','content':[{'text':part.model_response()}]})else:assertpart.tool_call_idisnotNonebedrock_messages.append({'role':'user','content':[{'toolResult':{'toolUseId':part.tool_call_id,'content':[{'text':part.model_response()}],'status':'error',}}],})elifisinstance(m,ModelResponse):content:list[ContentBlockOutputTypeDef]=[]foriteminm.parts:ifisinstance(item,TextPart):content.append({'text':item.content})else:assertisinstance(item,ToolCallPart)content.append(self._map_tool_call(item))bedrock_messages.append({'role':'assistant','content':content})else:assert_never(m)returnsystem_prompt,bedrock_messages@staticmethodasyncdef_map_user_prompt(part:UserPromptPart)->list[MessageUnionTypeDef]:content:list[ContentBlockUnionTypeDef]=[]ifisinstance(part.content,str):content.append({'text':part.content})else:document_count=0foriteminpart.content:ifisinstance(item,str):content.append({'text':item})elifisinstance(item,BinaryContent):format=item.formatifitem.is_document:document_count+=1name=f'Document {document_count}'assertformatin('pdf','txt','csv','doc','docx','xls','xlsx','html','md')content.append({'document':{'name':name,'format':format,'source':{'bytes':item.data}}})elifitem.is_image:assertformatin('jpeg','png','gif','webp')content.append({'image':{'format':format,'source':{'bytes':item.data}}})else:raiseNotImplementedError('Binary content is not supported yet.')elifisinstance(item,(ImageUrl,DocumentUrl)):response=awaitcached_async_http_client().get(item.url)response.raise_for_status()ifitem.kind=='image-url':format=item.media_type.split('/')[1]assertformatin('jpeg','png','gif','webp'),f'Unsupported image format: {format}'image:ImageBlockTypeDef={'format':format,'source':{'bytes':response.content}}content.append({'image':image})elifitem.kind=='document-url':document_count+=1name=f'Document {document_count}'data=response.contentcontent.append({'document':{'name':name,'format':item.format,'source':{'bytes':data}}})elifisinstance(item,AudioUrl):# pragma: no coverraiseNotImplementedError('Audio is not supported yet.')else:assert_never(item)return[{'role':'user','content':content}]@staticmethoddef_map_tool_call(t:ToolCallPart)->ContentBlockOutputTypeDef:return{'toolUse':{'toolUseId':_utils.guard_tool_call_id(t=t),'name':t.tool_name,'input':t.args_as_dict()}}
The provider to use for authentication and API access. Can be either the string
'bedrock' or an instance of Provider[BaseClient]. If not provided, a new provider will be
created using the other parameters.
'bedrock'
Source code in pydantic_ai_slim/pydantic_ai/models/bedrock.py
def__init__(self,model_name:BedrockModelName,*,provider:Literal['bedrock']|Provider[BaseClient]='bedrock',):"""Initialize a Bedrock model. Args: model_name: The name of the model to use. model_name: The name of the Bedrock model to use. List of model names available [here](https://docs.aws.amazon.com/bedrock/latest/userguide/models-supported.html). provider: The provider to use for authentication and API access. Can be either the string 'bedrock' or an instance of `Provider[BaseClient]`. If not provided, a new provider will be created using the other parameters. """self._model_name=model_nameifisinstance(provider,str):provider=infer_provider(provider)self.client=cast('BedrockRuntimeClient',provider.client)
@dataclassclassBedrockStreamedResponse(StreamedResponse):"""Implementation of `StreamedResponse` for Bedrock models."""_model_name:BedrockModelName_event_stream:EventStream[ConverseStreamOutputTypeDef]_timestamp:datetime=field(default_factory=_utils.now_utc)asyncdef_get_event_iterator(self)->AsyncIterator[ModelResponseStreamEvent]:"""Return an async iterator of [`ModelResponseStreamEvent`][pydantic_ai.messages.ModelResponseStreamEvent]s. This method should be implemented by subclasses to translate the vendor-specific stream of events into pydantic_ai-format events. """chunk:ConverseStreamOutputTypeDeftool_id:str|None=Noneasyncforchunkin_AsyncIteratorWrapper(self._event_stream):# TODO(Marcelo): Switch this to `match` when we drop Python 3.9 support.if'messageStart'inchunk:continueif'messageStop'inchunk:continueif'metadata'inchunk:if'usage'inchunk['metadata']:self._usage+=self._map_usage(chunk['metadata'])continueif'contentBlockStart'inchunk:index=chunk['contentBlockStart']['contentBlockIndex']start=chunk['contentBlockStart']['start']if'toolUse'instart:tool_use_start=start['toolUse']tool_id=tool_use_start['toolUseId']tool_name=tool_use_start['name']maybe_event=self._parts_manager.handle_tool_call_delta(vendor_part_id=index,tool_name=tool_name,args=None,tool_call_id=tool_id,)ifmaybe_event:yieldmaybe_eventif'contentBlockDelta'inchunk:index=chunk['contentBlockDelta']['contentBlockIndex']delta=chunk['contentBlockDelta']['delta']if'text'indelta:yieldself._parts_manager.handle_text_delta(vendor_part_id=index,content=delta['text'])if'toolUse'indelta:tool_use=delta['toolUse']maybe_event=self._parts_manager.handle_tool_call_delta(vendor_part_id=index,tool_name=tool_use.get('name'),args=tool_use.get('input'),tool_call_id=tool_id,)ifmaybe_event:yieldmaybe_event@propertydeftimestamp(self)->datetime:returnself._timestamp@propertydefmodel_name(self)->str:"""Get the model name of the response."""returnself._model_namedef_map_usage(self,metadata:ConverseStreamMetadataEventTypeDef)->result.Usage:returnresult.Usage(request_tokens=metadata['usage']['inputTokens'],response_tokens=metadata['usage']['outputTokens'],total_tokens=metadata['usage']['totalTokens'],)