xtalk
Xtalk
Defined in xtalk.api.
class Xtalk
Create Xtalk pipelines, services, and session entrypoints.
Notes
Xtalk is the main integration surface used by the sample applications.
It builds pipelines from configuration, stores a prototype service, and
accepts WebSocket sessions on demand.
Class Fields
MODEL_REGISTRY: dict[str, list[ImportSpec]]=SHARED_MODEL_REGISTRY
Methods
init
Defined in xtalk.api.
def __init__(self, *, service_prototype: Service, max_sessions: int | None = None)
Initialize an Xtalk application wrapper.
Parameters
service_prototype(Service) Prototype service used to clone per-session service instances.max_sessions(int | None, optional) Maximum number of concurrent sessions. If omitted, no session limit is enforced.
register_model_search_spec
Defined in xtalk.api.
def register_model_search_spec(cls, *, slot: str, spec: ImportSpec, prepend: bool = True) -> None
Register an additional model lookup location for a slot.
Parameters
slot(str) Registry slot name such as"llm_agent"or"tts".spec(ImportSpec) Import target to try when loading that slot. Supported forms include module paths,"module:attribute"references, and Python file paths.prepend(bool, default=True) Whether to try the new spec before existing registered specs.
Notes
Common spec forms include "my_pkg.custom_tts",
"my_pkg.custom_tts:registry", "/abs/path/custom_tts.py", and
Path("./custom_tts.py").
Examples
>>> Xtalk.register_model_search_spec(
... slot="llm_agent",
... spec="./echo_agent.py",
... )
from_config
Defined in xtalk.api.
def from_config(cls, path_or_dict: str | dict) -> 'Xtalk'
Build an Xtalk instance from configuration data.
Parameters
path_or_dict(str | dict) JSON file path or already loaded configuration dictionary.
Returns
XtalkConfigured application wrapper backed by aDefaultPipelineandDefaultService.
Examples
>>> xtalk = Xtalk.from_config("server_config.json")
create_pipeline_from_config
Defined in xtalk.api.
def create_pipeline_from_config(cls, *, pipeline_cls: Type[Pipeline], config_path_or_dict: str | dict, additional_model_registry: dict[str, Any]) -> Pipeline
Instantiate a custom pipeline class from configuration.
Parameters
pipeline_cls(Type[Pipeline]) Concrete pipeline type to instantiate.config_path_or_dict(str | dict) JSON file path or already loaded configuration dictionary.additional_model_registry(dict[str, Any]) Extra slot-to-instance mapping merged on top of the default model registry before the pipeline is created.
Returns
PipelinePipeline instance created from the supplied configuration.
Examples
>>> pipeline = Xtalk.create_pipeline_from_config(
... pipeline_cls=DefaultPipeline,
... config_path_or_dict="server_config.json",
... additional_model_registry={},
... )
set_session_limit
Defined in xtalk.api.
def set_session_limit(self, limit: int)
Set or replace the concurrent session limit.
Parameters
limit(int) Maximum number of active sessions allowed at the same time.
embed_text
Defined in xtalk.api.
async def embed_text(self, session_id: str, text: str, user_id: str | None = None)
Queue text for session-scoped embedding storage.
Parameters
session_id(str) Session identifier returned to the frontend.text(str) Text content that should be embedded and persisted for retrieval.
Raises
ValueErrorRaised if the target session does not exist.
add_agent_tools
Defined in xtalk.api.
def add_agent_tools(self, tools_or_factories: list[BaseTool | Callable[[], BaseTool]])
Attach tools to the prototype agent before sessions are created.
Parameters
tools_or_factories(list[BaseTool | Callable[[], BaseTool]]) Tool instances or zero-argument factories that produce tool instances.
Raises
RuntimeErrorRaised if at least one service session has already been created.
mount_routes
Defined in xtalk.api.
def mount_routes(self, app: Any, *, login_path: str = '/api/auth/login', sessions_path: str = '/api/sessions', session_detail_path: str = '/api/sessions/{session_id}', upload_path: str = '/api/upload', ws_path: str = '/ws') -> None
Mount the built-in auth, session, upload, and websocket routes.
connect
Defined in xtalk.api.
async def connect(self, websocket: WebSocket, user_id: str | None = None)
Accept a WebSocket session and hand it to the service manager.
Parameters
websocket(WebSocket) FastAPI WebSocket connection from the client.user_id(str | None, optional) Authenticated user identifier. When omitted, the connection falls back to the legacy connection-scoped session behavior.
Notes
If a session limit is configured, the socket is first admitted through the session limiter queue.
Pipeline
Defined in xtalk.pipelines.interfaces.
class Pipeline(ABC)
Define the model accessors expected by Xtalk services.
Notes
Concrete pipelines expose the models and helpers consumed by service
managers. Sample applications commonly subclass DefaultPipeline to add
extra components while retaining this interface.
Methods
clone
Defined in xtalk.pipelines.interfaces.
def clone(self) -> 'Pipeline'
Clone the pipeline for a new session.
Returns
PipelineSession-safe pipeline instance.
get_asr_model
Defined in xtalk.pipelines.interfaces.
def get_asr_model(self) -> ASR | None
Return the configured ASR model, if available.
Returns
ASR | NoneASR implementation orNonewhen the pipeline omits speech recognition.
get_tts_model
Defined in xtalk.pipelines.interfaces.
def get_tts_model(self) -> TTS | None
Return the configured TTS model, if available.
Returns
TTS | NoneTTS implementation orNone.
get_agent
Defined in xtalk.pipelines.interfaces.
def get_agent(self) -> Agent | None
Return the configured agent, if available.
Returns
Agent | NoneAgent implementation orNone.
get_captioner_model
Defined in xtalk.pipelines.interfaces.
def get_captioner_model(self) -> Captioner | None
Return the configured captioner, if available.
Returns
Captioner | NoneCaptioner implementation orNone.
get_punt_restorer_model
Defined in xtalk.pipelines.interfaces.
def get_punt_restorer_model(self) -> PuntRestorer | None
Return the configured punctuation restorer, if available.
Returns
PuntRestorer | NonePunctuation restoration model orNone.
get_caption_rewriter_model
Defined in xtalk.pipelines.interfaces.
def get_caption_rewriter_model(self) -> Rewriter | None
Return the caption rewriter used by caption-related managers.
Returns
Rewriter | NoneCaption rewriter orNone.
get_vad_model
Defined in xtalk.pipelines.interfaces.
def get_vad_model(self) -> VAD
Return the configured voice activity detector, if available.
Returns
VAD | NoneVAD implementation orNone.
get_enhancer_model
Defined in xtalk.pipelines.interfaces.
def get_enhancer_model(self) -> SpeechEnhancer | None
Return the configured speech enhancer, if available.
Returns
SpeechEnhancer | NoneSpeech enhancement model orNone.
get_speaker_encoder
Defined in xtalk.pipelines.interfaces.
def get_speaker_encoder(self) -> SpeakerEncoder | None
Return the configured speaker encoder, if available.
Returns
SpeakerEncoder | NoneSpeaker encoder orNone.
get_speed_controller
Defined in xtalk.pipelines.interfaces.
def get_speed_controller(self) -> SpeechSpeedController | None
Return the optional TTS speed controller.
Returns
SpeechSpeedController | NoneSpeed controller orNone.
get_embeddings_model
Defined in xtalk.pipelines.interfaces.
def get_embeddings_model(self) -> Embeddings | None
Return the embeddings model used for retrieval features.
Returns
Embeddings | NoneEmbeddings model orNone.
get_turn_detector_model
Defined in xtalk.pipelines.interfaces.
def get_turn_detector_model(self) -> TurnDetector | None
Return the configured turn detector, if available.
Returns
TurnDetector | NoneTurn detector orNone.
DefaultPipeline
Defined in xtalk.pipelines.default.
@dataclass(init=False)
class DefaultPipeline(Pipeline)
Store the standard Xtalk model bundle for a session.
Parameters
asr(ASR) Speech recognition model.llm_agent(Agent) Agent used to generate text responses and tool calls.tts(TTS) Text-to-speech model.default_response(str, optional) Fallback text returned when no better response is available.use_streaming_tts(bool, optional) Whether service managers should prefer streaming TTS when supported.captioner(Captioner | None, optional) Optional audio captioning model.punt_restorer_model(PuntRestorer | None, optional) Optional punctuation restoration model.caption_rewriter(Rewriter | BaseChatModel | None, optional) Optional caption rewriter or chat model to wrap as a rewriter.vad(VAD | None, optional) Optional voice activity detector.speech_enhancer(SpeechEnhancer | None, optional) Optional speech enhancement model.speaker_encoder(SpeakerEncoder | None, optional) Optional speaker embedding model.speech_speed_controller(SpeechSpeedController | None, optional) Optional post-processing speed controller for synthesized audio.embeddings(Embeddings | None, optional) Optional embeddings backend for retrieval features.turn_detector(TurnDetector | None, optional) Optional turn detector that coordinates interruption and generation.
Notes
The dataclass field metadata controls how instances are cloned for each
session. Subclasses can add new fields as long as they expose an
init_key metadata entry.
Class Fields
asr_model: ASR=field(metadata={'init_key': 'asr', 'clone': True})llm_agent: Agent=field(metadata={'init_key': 'llm_agent', 'clone': True})tts_model: TTS=field(metadata={'init_key': 'tts', 'clone': True})default_response: str=field(default="Sorry, I didn't catch that. Could you please say it again?", metadata={'init_key': 'default_response', 'clone': False})use_streaming_tts: bool=field(default=True, metadata={'init_key': 'use_streaming_tts', 'clone': False})captioner_model: Optional[Captioner]=field(default=None, metadata={'init_key': 'captioner', 'clone': False})punt_restorer_model: Optional[PuntRestorer]=field(default=None, metadata={'init_key': 'punt_restorer_model', 'clone': False})caption_rewriter: Optional[Rewriter]=field(default=None, metadata={'init_key': 'caption_rewriter', 'clone': False})vad_model: Optional[VAD]=field(default=None, metadata={'init_key': 'vad', 'clone': True})enhancer_model: Optional[SpeechEnhancer]=field(default=None, metadata={'init_key': 'speech_enhancer', 'clone': True})speaker_encoder: Optional[SpeakerEncoder]=field(default=None, metadata={'init_key': 'speaker_encoder', 'clone': False})embeddings_model: Optional[Embeddings]=field(default=None, metadata={'init_key': 'embeddings', 'clone': False})turn_detector_model: Optional[TurnDetector]=field(default=None, metadata={'init_key': 'turn_detector', 'clone': True})
Methods
init
Defined in xtalk.pipelines.default.
def __init__(self, asr: ASR, llm_agent: Agent, tts: TTS, default_response: str = "Sorry, I didn't catch that. Could you please say it again?", use_streaming_tts: bool = True, captioner: Optional[Captioner] = None, punt_restorer_model: Optional[PuntRestorer] = None, caption_rewriter: Optional[Rewriter | BaseChatModel] = None, vad: Optional[VAD] = None, speech_enhancer: Optional[SpeechEnhancer] = None, speaker_encoder: Optional[SpeakerEncoder] = None, speech_speed_controller: Optional[SpeechSpeedController] = None, embeddings: Optional[Embeddings] = None, turn_detector: Optional[TurnDetector] = None)
Initialize the default pipeline.
Parameters
asr(ASR) Speech recognition model.llm_agent(Agent) Agent used for response generation.tts(TTS) Text-to-speech model.default_response(str, optional) Fallback response text.use_streaming_tts(bool, optional) Whether to prefer streaming TTS paths.captioner(Captioner | None, optional) Optional audio captioning model.punt_restorer_model(PuntRestorer | None, optional) Optional punctuation restoration model.caption_rewriter(Rewriter | BaseChatModel | None, optional) Optional caption rewriter or chat model.vad(VAD | None, optional) Optional voice activity detector.speech_enhancer(SpeechEnhancer | None, optional) Optional speech enhancer.speaker_encoder(SpeakerEncoder | None, optional) Optional speaker encoder.speech_speed_controller(SpeechSpeedController | None, optional) Optional speed controller for TTS output.embeddings(Embeddings | None, optional) Optional embeddings backend.turn_detector(TurnDetector | None, optional) Optional turn detector.
clone
Defined in xtalk.pipelines.default.
def clone(self)
Clone the pipeline according to field metadata.
Returns
DefaultPipelineNew pipeline instance of the same concrete type.
Notes
Fields marked with clone=True call their own clone() method when
available. Remaining fields are shared by reference.
get_asr_model
Defined in xtalk.pipelines.default.
def get_asr_model(self) -> ASR | None
get_tts_model
Defined in xtalk.pipelines.default.
def get_tts_model(self) -> TTS | None
get_agent
Defined in xtalk.pipelines.default.
def get_agent(self) -> Agent | None
get_captioner_model
Defined in xtalk.pipelines.default.
def get_captioner_model(self)
get_punt_restorer_model
Defined in xtalk.pipelines.default.
def get_punt_restorer_model(self)
get_caption_rewriter_model
Defined in xtalk.pipelines.default.
def get_caption_rewriter_model(self)
get_vad_model
Defined in xtalk.pipelines.default.
def get_vad_model(self)
get_enhancer_model
Defined in xtalk.pipelines.default.
def get_enhancer_model(self)
get_speaker_encoder
Defined in xtalk.pipelines.default.
def get_speaker_encoder(self)
get_speed_controller
Defined in xtalk.pipelines.default.
def get_speed_controller(self) -> SpeechSpeedController | None
get_embeddings_model
Defined in xtalk.pipelines.default.
def get_embeddings_model(self) -> Embeddings | None
get_turn_detector_model
Defined in xtalk.pipelines.default.
def get_turn_detector_model(self) -> TurnDetector | None
set_tts_model
Defined in xtalk.pipelines.default.
def set_tts_model(self, model_type: str, config: dict) -> None
Switch the active TTS model at runtime.
Parameters
model_type(str) Supported model family name, currently"IndexTTS"or"IndexTTS2".config(dict) Runtime configuration passed to the new TTS model constructor.
Raises
ValueErrorRaised ifmodel_typeis not supported.
set_llm_model
Defined in xtalk.pipelines.default.
def set_llm_model(self, model: str, base_url: str = '', api_key: str = '', extra_body: dict | None = None) -> None
Replace the current agent LLM with a ChatOpenAI instance.
Parameters
model(str) Target model name passed toChatOpenAI.base_url(str, optional) Override for the OpenAI-compatible API base URL.api_key(str, optional) API key used for the replacement model. Falls back toOPENAI_API_KEYwhen omitted.extra_body(dict | None, optional) Additional request payload fields forwarded toChatOpenAI.
Service
Defined in xtalk.serving.service.
class Service
Orchestrate a session-scoped pipeline and manager stack.
Parameters
pipeline(Pipeline) Pipeline prototype that will be cloned for the session.service_config(dict[str, Any] | None, optional) Session configuration shared with managers and gateways.manager_classes(list[Type[Manager]] | None, optional) Manager classes to instantiate for live sessions._websocket(WebSocket | None, optional) Internal WebSocket handle for live sessions.Nonemeans the instance acts as a prototype only._event_overrides(dict[Type[EventListenerMixin], EventOverrides] | None, optional) Internal event subscription overrides copied into cloned sessions.
Methods
init
Defined in xtalk.serving.service.
def __init__(self, *, pipeline: Pipeline, service_config: dict[str, Any] | None = None, manager_classes: list[Type[Manager]] | None = None, _websocket: WebSocket | None = None, _session_id: str | None = None, _event_overrides: dict[Type[EventListenerMixin], EventOverrides] | None = None)
unsubscribe_event
Defined in xtalk.serving.service.
def unsubscribe_event(self, *, event_listener_cls: Type[EventListenerMixin], event_type: Type[BaseEvent], method_name: str | None = None) -> None
Disable an automatic event subscription for a listener class.
Parameters
event_listener_cls(Type[EventListenerMixin]) Listener class whose subscription should be disabled.event_type(Type[BaseEvent]) Event type to unsubscribe.method_name(str | None, optional) Specific method name to disable. If omitted, every handler for the event is disabled for the listener class.
subscribe_event
Defined in xtalk.serving.service.
def subscribe_event(self, *, event_listener_cls: Type[EventListenerMixin], event_type: Type[BaseEvent], method_or_handler: str | Callable[[EventListenerMixin, BaseEvent], Any] | Callable[[BaseEvent], Any] | Callable[[EventListenerMixin, BaseEvent], Coroutine[Any, Any, Any]] | Callable[[BaseEvent], Coroutine[Any, Any, Any]], priority: int = 0, enabled_if: Callable[[EventListenerMixin], bool] | None = None) -> None
Register an additional event subscription override.
Parameters
event_listener_cls(Type[EventListenerMixin]) Listener class that should receive the event.event_type(Type[BaseEvent]) Event type to subscribe to.method_or_handler(str | Callable) Method name on the listener instance or an external sync/async handler acceptingeventor(listener, event).priority(int, optional) Subscription priority. Higher values run first.enabled_if(Callable[[EventListenerMixin], bool] | None, optional) Predicate used to decide whether the subscription should be installed for a concrete listener instance.
register_manager
Defined in xtalk.serving.service.
def register_manager(self, manager_cls: Type[Manager])
Register a manager class on the service.
Parameters
manager_cls(Type[Manager]) Manager class to add to the service prototype or live session.
unregister_manager
Defined in xtalk.serving.service.
def unregister_manager(self, manager_cls: Type[Manager])
Remove a manager class from the service.
Parameters
manager_cls(Type[Manager]) Manager class to remove.
handle_message_loop
Defined in xtalk.serving.service.
async def handle_message_loop(self, already_accepted: bool = False) -> None
Run the full WebSocket message loop for a live session.
Parameters
already_accepted(bool, optional) Whether the WebSocket has already been accepted by an upstream limiter or caller.
Raises
RuntimeErrorRaised if the service instance is still a prototype without runtime gateways.
restore_conversation
Defined in xtalk.serving.service.
def restore_conversation(self, *, messages: list[dict[str, Any]]) -> None
Restore persisted conversation history into the session agent.
send_session_attached
Defined in xtalk.serving.service.
async def send_session_attached(self) -> None
Notify the client that the session is attached.
stop
Defined in xtalk.serving.service.
async def stop(self) -> None
Stop the service and shut down all managers.
clone
Defined in xtalk.serving.service.
def clone(self, new_websocket: WebSocket, *, session_id: str | None = None, service_config_overrides: dict[str, Any] | None = None) -> 'Service'
Clone the service prototype for a new WebSocket session.
Parameters
new_websocket(WebSocket) WebSocket assigned to the new live session.
Returns
ServiceCloned service instance of the same concrete type.
DefaultService
Defined in xtalk.serving.service.
class DefaultService(Service)
Convenience Service with the standard Xtalk manager stack.
Notes
Sample applications usually instantiate DefaultService directly and then
register or override managers for custom behavior.
Class Fields
MANAGER_CLASSES: list[Type[Manager]]=[ASRManager, LLMAgentContextManager, LLMAgentConsumptionManager, DirectAudioManager, TTSManager, TTSPlaybackManager, CaptionerManager, RetrievalManager, TurnTakingManager, LatencyManager, VADManager, EnhancerManager, SpeakerManager, EmbeddingsManager, RecordingManager, TurnDetectorManager]
Methods
init
Defined in xtalk.serving.service.
def __init__(self, *, pipeline: Pipeline, service_config: dict[str, Any] | None = None, manager_classes: list[Type[Manager]] | None = None, _websocket: WebSocket | None = None, _session_id: str | None = None, _event_overrides: dict[Type[EventListenerMixin], EventOverrides] | None = None)
BaseEvent
Defined in xtalk.serving.events.
@dataclass
class BaseEvent
Base dataclass for all Xtalk events.
Parameters
session_id(str) Session identifier associated with the event.
Attributes
timestamp(float) Unix timestamp recorded when the event instance is created.session_id(str) Session identifier associated with the event.TYPE(str) Stable event type string used by the event bus.
Class Fields
timestamp: float=field(init=False)session_id: strTYPE: ClassVar[str]='base'
Methods
event_type
Defined in xtalk.serving.events.
def event_type(self) -> str
create_event_class
Defined in xtalk.serving.events.
def create_event_class(*, name: str, fields: dict[str, Any] | None = None, type_name: str | None = None) -> Type[BaseEvent]
Create a BaseEvent subclass dynamically.
Parameters
name(str) Dataclass name for the generated event type.fields(dict[str, Any] | None, optional) Mapping of field names to default values. Value types are inferred from the defaults.type_name(str | None, optional) Event bus type string. Defaults toname.lower()when omitted.
Returns
Type[BaseEvent]Generated dataclass type inheriting fromBaseEvent.
Examples
>>> CustomEvent = create_event_class(
... name="CustomEvent",
... fields={"text": ""},
... )
Manager
Defined in xtalk.serving.interfaces.
class Manager(EventListenerMixin, ShutdownMixin)
Base class for Xtalk managers.
Notes
Subclasses typically accept event_bus, session_id, pipeline, and
config arguments, then register handlers with @Manager.event_handler.
Methods
event_handler
Defined in xtalk.serving.interfaces.
def event_handler(event_type: Type[BaseEvent], *, priority: int = 0, enabled_if: Callable[['EventListenerMixin'], bool] | None = None)
Declare a manager event handler.
Parameters
event_type(Type[BaseEvent]) Event class handled by the decorated method.priority(int, optional) Execution priority for the handler. Higher values run first.enabled_if(Callable[[EventListenerMixin], bool] | None, optional) Predicate evaluated against the manager instance before the handler is registered.
Returns
CallableDecorator that marks the method for automatic subscription.
EventBus
Defined in xtalk.serving.event_bus.
class EventBus
Publish and subscribe session events with async dispatch support.
Parameters
enable_history(bool, optional) Whether to store published events in memory for later inspection.max_history(int, optional) Maximum number of events kept when history is enabled.
Class Fields
MAX_ERROR_EVENT_DEPTH=3ERROR_EVENT_COOLDOWN=1.0ERROR_EVENT_RATE_LIMIT=10
Methods
init
Defined in xtalk.serving.event_bus.
def __init__(self, enable_history: bool = False, max_history: int = 1000)
Initialize the event bus.
Parameters
enable_history(bool, optional) Whether to record published events in the in-memory history buffer.max_history(int, optional) Maximum number of events retained in history.
subscribe
Defined in xtalk.serving.event_bus.
def subscribe(self, event_class: Union[Type[BaseEvent], str], handler: Callable[[BaseEvent], Any], priority: int = 0) -> None
Subscribe a handler to an event type.
Parameters
event_class(Type[BaseEvent] | str) Event class or event type string such as"tts.started".handler(Callable[[BaseEvent], Any]) Sync or async callable invoked for matching events.priority(int, optional) Higher values run earlier.
unsubscribe
Defined in xtalk.serving.event_bus.
def unsubscribe(self, event_class: Union[Type[BaseEvent], str], handler: Callable) -> bool
Unsubscribe a handler from an event type.
Parameters
event_class(Type[BaseEvent] | str) Event class or event type string.handler(Callable) Previously subscribed handler to remove.
Returns
boolTrueif the handler was removed, otherwiseFalse.
publish
Defined in xtalk.serving.event_bus.
async def publish(self, event: BaseEvent, wait_for_completion: bool = False) -> bool
Publish an event to all matching handlers.
Parameters
event(BaseEvent) Event instance to dispatch.wait_for_completion(bool, optional) Whether to await handler completion before returning.
Returns
boolTrueon success, otherwiseFalse.
get_history
Defined in xtalk.serving.event_bus.
def get_history(self, event_type: Optional[str] = None, session_id: Optional[str] = None) -> List[BaseEvent]
Retrieve event history with optional filters.
Args: event_type: filter by type session_id: filter by session id
Returns: List of events
get_stats
Defined in xtalk.serving.event_bus.
def get_stats(self) -> Dict[str, Any]
Return current event bus statistics.
Returns: Dict of stats
clear_history
Defined in xtalk.serving.event_bus.
def clear_history(self) -> None
Clear event history.
reset_error_tracking
Defined in xtalk.serving.event_bus.
def reset_error_tracking(self) -> None
Reset error event tracking (useful for testing).
shutdown
Defined in xtalk.serving.event_bus.
async def shutdown(self) -> None
Shut down the event bus and release resources.