Skip to content

xtalk

Xtalk

Defined in xtalk.api.

class Xtalk

Create Xtalk pipelines, services, and session entrypoints.

Notes

Xtalk is the main integration surface used by the sample applications. It builds pipelines from configuration, stores a prototype service, and accepts WebSocket sessions on demand.

Class Fields

  • MODEL_REGISTRY: dict[str, list[ImportSpec]] = SHARED_MODEL_REGISTRY

Methods

init

Defined in xtalk.api.

def __init__(self, *, service_prototype: Service, max_sessions: int | None = None)

Initialize an Xtalk application wrapper.

Parameters
  • service_prototype (Service) Prototype service used to clone per-session service instances.
  • max_sessions (int | None, optional) Maximum number of concurrent sessions. If omitted, no session limit is enforced.

register_model_search_spec

Defined in xtalk.api.

def register_model_search_spec(cls, *, slot: str, spec: ImportSpec, prepend: bool = True) -> None

Register an additional model lookup location for a slot.

Parameters
  • slot (str) Registry slot name such as "llm_agent" or "tts".
  • spec (ImportSpec) Import target to try when loading that slot. Supported forms include module paths, "module:attribute" references, and Python file paths.
  • prepend (bool, default=True) Whether to try the new spec before existing registered specs.
Notes

Common spec forms include "my_pkg.custom_tts", "my_pkg.custom_tts:registry", "/abs/path/custom_tts.py", and Path("./custom_tts.py").

Examples
>>> Xtalk.register_model_search_spec(
...     slot="llm_agent",
...     spec="./echo_agent.py",
... )

from_config

Defined in xtalk.api.

def from_config(cls, path_or_dict: str | dict) -> 'Xtalk'

Build an Xtalk instance from configuration data.

Parameters
  • path_or_dict (str | dict) JSON file path or already loaded configuration dictionary.
Returns
  • Xtalk Configured application wrapper backed by a DefaultPipeline and DefaultService.
Examples
>>> xtalk = Xtalk.from_config("server_config.json")

create_pipeline_from_config

Defined in xtalk.api.

def create_pipeline_from_config(cls, *, pipeline_cls: Type[Pipeline], config_path_or_dict: str | dict, additional_model_registry: dict[str, Any]) -> Pipeline

Instantiate a custom pipeline class from configuration.

Parameters
  • pipeline_cls (Type[Pipeline]) Concrete pipeline type to instantiate.
  • config_path_or_dict (str | dict) JSON file path or already loaded configuration dictionary.
  • additional_model_registry (dict[str, Any]) Extra slot-to-instance mapping merged on top of the default model registry before the pipeline is created.
Returns
  • Pipeline Pipeline instance created from the supplied configuration.
Examples
>>> pipeline = Xtalk.create_pipeline_from_config(
...     pipeline_cls=DefaultPipeline,
...     config_path_or_dict="server_config.json",
...     additional_model_registry={},
... )

set_session_limit

Defined in xtalk.api.

def set_session_limit(self, limit: int)

Set or replace the concurrent session limit.

Parameters
  • limit (int) Maximum number of active sessions allowed at the same time.

embed_text

Defined in xtalk.api.

async def embed_text(self, session_id: str, text: str, user_id: str | None = None)

Queue text for session-scoped embedding storage.

Parameters
  • session_id (str) Session identifier returned to the frontend.
  • text (str) Text content that should be embedded and persisted for retrieval.
Raises
  • ValueError Raised if the target session does not exist.

add_agent_tools

Defined in xtalk.api.

def add_agent_tools(self, tools_or_factories: list[BaseTool | Callable[[], BaseTool]])

Attach tools to the prototype agent before sessions are created.

Parameters
  • tools_or_factories (list[BaseTool | Callable[[], BaseTool]]) Tool instances or zero-argument factories that produce tool instances.
Raises
  • RuntimeError Raised if at least one service session has already been created.

mount_routes

Defined in xtalk.api.

def mount_routes(self, app: Any, *, login_path: str = '/api/auth/login', sessions_path: str = '/api/sessions', session_detail_path: str = '/api/sessions/{session_id}', upload_path: str = '/api/upload', ws_path: str = '/ws') -> None

Mount the built-in auth, session, upload, and websocket routes.

connect

Defined in xtalk.api.

async def connect(self, websocket: WebSocket, user_id: str | None = None)

Accept a WebSocket session and hand it to the service manager.

Parameters
  • websocket (WebSocket) FastAPI WebSocket connection from the client.
  • user_id (str | None, optional) Authenticated user identifier. When omitted, the connection falls back to the legacy connection-scoped session behavior.
Notes

If a session limit is configured, the socket is first admitted through the session limiter queue.

Pipeline

Defined in xtalk.pipelines.interfaces.

class Pipeline(ABC)

Define the model accessors expected by Xtalk services.

Notes

Concrete pipelines expose the models and helpers consumed by service managers. Sample applications commonly subclass DefaultPipeline to add extra components while retaining this interface.

Methods

clone

Defined in xtalk.pipelines.interfaces.

def clone(self) -> 'Pipeline'

Clone the pipeline for a new session.

Returns
  • Pipeline Session-safe pipeline instance.

get_asr_model

Defined in xtalk.pipelines.interfaces.

def get_asr_model(self) -> ASR | None

Return the configured ASR model, if available.

Returns
  • ASR | None ASR implementation or None when the pipeline omits speech recognition.

get_tts_model

Defined in xtalk.pipelines.interfaces.

def get_tts_model(self) -> TTS | None

Return the configured TTS model, if available.

Returns
  • TTS | None TTS implementation or None.

get_agent

Defined in xtalk.pipelines.interfaces.

def get_agent(self) -> Agent | None

Return the configured agent, if available.

Returns
  • Agent | None Agent implementation or None.

get_captioner_model

Defined in xtalk.pipelines.interfaces.

def get_captioner_model(self) -> Captioner | None

Return the configured captioner, if available.

Returns
  • Captioner | None Captioner implementation or None.

get_punt_restorer_model

Defined in xtalk.pipelines.interfaces.

def get_punt_restorer_model(self) -> PuntRestorer | None

Return the configured punctuation restorer, if available.

Returns
  • PuntRestorer | None Punctuation restoration model or None.

get_caption_rewriter_model

Defined in xtalk.pipelines.interfaces.

def get_caption_rewriter_model(self) -> Rewriter | None

Return the caption rewriter used by caption-related managers.

Returns
  • Rewriter | None Caption rewriter or None.

get_vad_model

Defined in xtalk.pipelines.interfaces.

def get_vad_model(self) -> VAD

Return the configured voice activity detector, if available.

Returns
  • VAD | None VAD implementation or None.

get_enhancer_model

Defined in xtalk.pipelines.interfaces.

def get_enhancer_model(self) -> SpeechEnhancer | None

Return the configured speech enhancer, if available.

Returns
  • SpeechEnhancer | None Speech enhancement model or None.

get_speaker_encoder

Defined in xtalk.pipelines.interfaces.

def get_speaker_encoder(self) -> SpeakerEncoder | None

Return the configured speaker encoder, if available.

Returns
  • SpeakerEncoder | None Speaker encoder or None.

get_speed_controller

Defined in xtalk.pipelines.interfaces.

def get_speed_controller(self) -> SpeechSpeedController | None

Return the optional TTS speed controller.

Returns
  • SpeechSpeedController | None Speed controller or None.

get_embeddings_model

Defined in xtalk.pipelines.interfaces.

def get_embeddings_model(self) -> Embeddings | None

Return the embeddings model used for retrieval features.

Returns
  • Embeddings | None Embeddings model or None.

get_turn_detector_model

Defined in xtalk.pipelines.interfaces.

def get_turn_detector_model(self) -> TurnDetector | None

Return the configured turn detector, if available.

Returns
  • TurnDetector | None Turn detector or None.

DefaultPipeline

Defined in xtalk.pipelines.default.

@dataclass(init=False)
class DefaultPipeline(Pipeline)

Store the standard Xtalk model bundle for a session.

Parameters

  • asr (ASR) Speech recognition model.
  • llm_agent (Agent) Agent used to generate text responses and tool calls.
  • tts (TTS) Text-to-speech model.
  • default_response (str, optional) Fallback text returned when no better response is available.
  • use_streaming_tts (bool, optional) Whether service managers should prefer streaming TTS when supported.
  • captioner (Captioner | None, optional) Optional audio captioning model.
  • punt_restorer_model (PuntRestorer | None, optional) Optional punctuation restoration model.
  • caption_rewriter (Rewriter | BaseChatModel | None, optional) Optional caption rewriter or chat model to wrap as a rewriter.
  • vad (VAD | None, optional) Optional voice activity detector.
  • speech_enhancer (SpeechEnhancer | None, optional) Optional speech enhancement model.
  • speaker_encoder (SpeakerEncoder | None, optional) Optional speaker embedding model.
  • speech_speed_controller (SpeechSpeedController | None, optional) Optional post-processing speed controller for synthesized audio.
  • embeddings (Embeddings | None, optional) Optional embeddings backend for retrieval features.
  • turn_detector (TurnDetector | None, optional) Optional turn detector that coordinates interruption and generation.

Notes

The dataclass field metadata controls how instances are cloned for each session. Subclasses can add new fields as long as they expose an init_key metadata entry.

Class Fields

  • asr_model: ASR = field(metadata={'init_key': 'asr', 'clone': True})
  • llm_agent: Agent = field(metadata={'init_key': 'llm_agent', 'clone': True})
  • tts_model: TTS = field(metadata={'init_key': 'tts', 'clone': True})
  • default_response: str = field(default="Sorry, I didn't catch that. Could you please say it again?", metadata={'init_key': 'default_response', 'clone': False})
  • use_streaming_tts: bool = field(default=True, metadata={'init_key': 'use_streaming_tts', 'clone': False})
  • captioner_model: Optional[Captioner] = field(default=None, metadata={'init_key': 'captioner', 'clone': False})
  • punt_restorer_model: Optional[PuntRestorer] = field(default=None, metadata={'init_key': 'punt_restorer_model', 'clone': False})
  • caption_rewriter: Optional[Rewriter] = field(default=None, metadata={'init_key': 'caption_rewriter', 'clone': False})
  • vad_model: Optional[VAD] = field(default=None, metadata={'init_key': 'vad', 'clone': True})
  • enhancer_model: Optional[SpeechEnhancer] = field(default=None, metadata={'init_key': 'speech_enhancer', 'clone': True})
  • speaker_encoder: Optional[SpeakerEncoder] = field(default=None, metadata={'init_key': 'speaker_encoder', 'clone': False})
  • embeddings_model: Optional[Embeddings] = field(default=None, metadata={'init_key': 'embeddings', 'clone': False})
  • turn_detector_model: Optional[TurnDetector] = field(default=None, metadata={'init_key': 'turn_detector', 'clone': True})

Methods

init

Defined in xtalk.pipelines.default.

def __init__(self, asr: ASR, llm_agent: Agent, tts: TTS, default_response: str = "Sorry, I didn't catch that. Could you please say it again?", use_streaming_tts: bool = True, captioner: Optional[Captioner] = None, punt_restorer_model: Optional[PuntRestorer] = None, caption_rewriter: Optional[Rewriter | BaseChatModel] = None, vad: Optional[VAD] = None, speech_enhancer: Optional[SpeechEnhancer] = None, speaker_encoder: Optional[SpeakerEncoder] = None, speech_speed_controller: Optional[SpeechSpeedController] = None, embeddings: Optional[Embeddings] = None, turn_detector: Optional[TurnDetector] = None)

Initialize the default pipeline.

Parameters
  • asr (ASR) Speech recognition model.
  • llm_agent (Agent) Agent used for response generation.
  • tts (TTS) Text-to-speech model.
  • default_response (str, optional) Fallback response text.
  • use_streaming_tts (bool, optional) Whether to prefer streaming TTS paths.
  • captioner (Captioner | None, optional) Optional audio captioning model.
  • punt_restorer_model (PuntRestorer | None, optional) Optional punctuation restoration model.
  • caption_rewriter (Rewriter | BaseChatModel | None, optional) Optional caption rewriter or chat model.
  • vad (VAD | None, optional) Optional voice activity detector.
  • speech_enhancer (SpeechEnhancer | None, optional) Optional speech enhancer.
  • speaker_encoder (SpeakerEncoder | None, optional) Optional speaker encoder.
  • speech_speed_controller (SpeechSpeedController | None, optional) Optional speed controller for TTS output.
  • embeddings (Embeddings | None, optional) Optional embeddings backend.
  • turn_detector (TurnDetector | None, optional) Optional turn detector.

clone

Defined in xtalk.pipelines.default.

def clone(self)

Clone the pipeline according to field metadata.

Returns
  • DefaultPipeline New pipeline instance of the same concrete type.
Notes

Fields marked with clone=True call their own clone() method when available. Remaining fields are shared by reference.

get_asr_model

Defined in xtalk.pipelines.default.

def get_asr_model(self) -> ASR | None

get_tts_model

Defined in xtalk.pipelines.default.

def get_tts_model(self) -> TTS | None

get_agent

Defined in xtalk.pipelines.default.

def get_agent(self) -> Agent | None

get_captioner_model

Defined in xtalk.pipelines.default.

def get_captioner_model(self)

get_punt_restorer_model

Defined in xtalk.pipelines.default.

def get_punt_restorer_model(self)

get_caption_rewriter_model

Defined in xtalk.pipelines.default.

def get_caption_rewriter_model(self)

get_vad_model

Defined in xtalk.pipelines.default.

def get_vad_model(self)

get_enhancer_model

Defined in xtalk.pipelines.default.

def get_enhancer_model(self)

get_speaker_encoder

Defined in xtalk.pipelines.default.

def get_speaker_encoder(self)

get_speed_controller

Defined in xtalk.pipelines.default.

def get_speed_controller(self) -> SpeechSpeedController | None

get_embeddings_model

Defined in xtalk.pipelines.default.

def get_embeddings_model(self) -> Embeddings | None

get_turn_detector_model

Defined in xtalk.pipelines.default.

def get_turn_detector_model(self) -> TurnDetector | None

set_tts_model

Defined in xtalk.pipelines.default.

def set_tts_model(self, model_type: str, config: dict) -> None

Switch the active TTS model at runtime.

Parameters
  • model_type (str) Supported model family name, currently "IndexTTS" or "IndexTTS2".
  • config (dict) Runtime configuration passed to the new TTS model constructor.
Raises
  • ValueError Raised if model_type is not supported.

set_llm_model

Defined in xtalk.pipelines.default.

def set_llm_model(self, model: str, base_url: str = '', api_key: str = '', extra_body: dict | None = None) -> None

Replace the current agent LLM with a ChatOpenAI instance.

Parameters
  • model (str) Target model name passed to ChatOpenAI.
  • base_url (str, optional) Override for the OpenAI-compatible API base URL.
  • api_key (str, optional) API key used for the replacement model. Falls back to OPENAI_API_KEY when omitted.
  • extra_body (dict | None, optional) Additional request payload fields forwarded to ChatOpenAI.

Service

Defined in xtalk.serving.service.

class Service

Orchestrate a session-scoped pipeline and manager stack.

Parameters

  • pipeline (Pipeline) Pipeline prototype that will be cloned for the session.
  • service_config (dict[str, Any] | None, optional) Session configuration shared with managers and gateways.
  • manager_classes (list[Type[Manager]] | None, optional) Manager classes to instantiate for live sessions.
  • _websocket (WebSocket | None, optional) Internal WebSocket handle for live sessions. None means the instance acts as a prototype only.
  • _event_overrides (dict[Type[EventListenerMixin], EventOverrides] | None, optional) Internal event subscription overrides copied into cloned sessions.

Methods

init

Defined in xtalk.serving.service.

def __init__(self, *, pipeline: Pipeline, service_config: dict[str, Any] | None = None, manager_classes: list[Type[Manager]] | None = None, _websocket: WebSocket | None = None, _session_id: str | None = None, _event_overrides: dict[Type[EventListenerMixin], EventOverrides] | None = None)

unsubscribe_event

Defined in xtalk.serving.service.

def unsubscribe_event(self, *, event_listener_cls: Type[EventListenerMixin], event_type: Type[BaseEvent], method_name: str | None = None) -> None

Disable an automatic event subscription for a listener class.

Parameters
  • event_listener_cls (Type[EventListenerMixin]) Listener class whose subscription should be disabled.
  • event_type (Type[BaseEvent]) Event type to unsubscribe.
  • method_name (str | None, optional) Specific method name to disable. If omitted, every handler for the event is disabled for the listener class.

subscribe_event

Defined in xtalk.serving.service.

def subscribe_event(self, *, event_listener_cls: Type[EventListenerMixin], event_type: Type[BaseEvent], method_or_handler: str | Callable[[EventListenerMixin, BaseEvent], Any] | Callable[[BaseEvent], Any] | Callable[[EventListenerMixin, BaseEvent], Coroutine[Any, Any, Any]] | Callable[[BaseEvent], Coroutine[Any, Any, Any]], priority: int = 0, enabled_if: Callable[[EventListenerMixin], bool] | None = None) -> None

Register an additional event subscription override.

Parameters
  • event_listener_cls (Type[EventListenerMixin]) Listener class that should receive the event.
  • event_type (Type[BaseEvent]) Event type to subscribe to.
  • method_or_handler (str | Callable) Method name on the listener instance or an external sync/async handler accepting event or (listener, event).
  • priority (int, optional) Subscription priority. Higher values run first.
  • enabled_if (Callable[[EventListenerMixin], bool] | None, optional) Predicate used to decide whether the subscription should be installed for a concrete listener instance.

register_manager

Defined in xtalk.serving.service.

def register_manager(self, manager_cls: Type[Manager])

Register a manager class on the service.

Parameters
  • manager_cls (Type[Manager]) Manager class to add to the service prototype or live session.

unregister_manager

Defined in xtalk.serving.service.

def unregister_manager(self, manager_cls: Type[Manager])

Remove a manager class from the service.

Parameters
  • manager_cls (Type[Manager]) Manager class to remove.

handle_message_loop

Defined in xtalk.serving.service.

async def handle_message_loop(self, already_accepted: bool = False) -> None

Run the full WebSocket message loop for a live session.

Parameters
  • already_accepted (bool, optional) Whether the WebSocket has already been accepted by an upstream limiter or caller.
Raises
  • RuntimeError Raised if the service instance is still a prototype without runtime gateways.

restore_conversation

Defined in xtalk.serving.service.

def restore_conversation(self, *, messages: list[dict[str, Any]]) -> None

Restore persisted conversation history into the session agent.

send_session_attached

Defined in xtalk.serving.service.

async def send_session_attached(self) -> None

Notify the client that the session is attached.

stop

Defined in xtalk.serving.service.

async def stop(self) -> None

Stop the service and shut down all managers.

clone

Defined in xtalk.serving.service.

def clone(self, new_websocket: WebSocket, *, session_id: str | None = None, service_config_overrides: dict[str, Any] | None = None) -> 'Service'

Clone the service prototype for a new WebSocket session.

Parameters
  • new_websocket (WebSocket) WebSocket assigned to the new live session.
Returns
  • Service Cloned service instance of the same concrete type.

DefaultService

Defined in xtalk.serving.service.

class DefaultService(Service)

Convenience Service with the standard Xtalk manager stack.

Notes

Sample applications usually instantiate DefaultService directly and then register or override managers for custom behavior.

Class Fields

  • MANAGER_CLASSES: list[Type[Manager]] = [ASRManager, LLMAgentContextManager, LLMAgentConsumptionManager, DirectAudioManager, TTSManager, TTSPlaybackManager, CaptionerManager, RetrievalManager, TurnTakingManager, LatencyManager, VADManager, EnhancerManager, SpeakerManager, EmbeddingsManager, RecordingManager, TurnDetectorManager]

Methods

init

Defined in xtalk.serving.service.

def __init__(self, *, pipeline: Pipeline, service_config: dict[str, Any] | None = None, manager_classes: list[Type[Manager]] | None = None, _websocket: WebSocket | None = None, _session_id: str | None = None, _event_overrides: dict[Type[EventListenerMixin], EventOverrides] | None = None)

BaseEvent

Defined in xtalk.serving.events.

@dataclass
class BaseEvent

Base dataclass for all Xtalk events.

Parameters

  • session_id (str) Session identifier associated with the event.

Attributes

  • timestamp (float) Unix timestamp recorded when the event instance is created.
  • session_id (str) Session identifier associated with the event.
  • TYPE (str) Stable event type string used by the event bus.

Class Fields

  • timestamp: float = field(init=False)
  • session_id: str
  • TYPE: ClassVar[str] = 'base'

Methods

event_type

Defined in xtalk.serving.events.

def event_type(self) -> str

create_event_class

Defined in xtalk.serving.events.

def create_event_class(*, name: str, fields: dict[str, Any] | None = None, type_name: str | None = None) -> Type[BaseEvent]

Create a BaseEvent subclass dynamically.

Parameters

  • name (str) Dataclass name for the generated event type.
  • fields (dict[str, Any] | None, optional) Mapping of field names to default values. Value types are inferred from the defaults.
  • type_name (str | None, optional) Event bus type string. Defaults to name.lower() when omitted.

Returns

  • Type[BaseEvent] Generated dataclass type inheriting from BaseEvent.

Examples

>>> CustomEvent = create_event_class(
...     name="CustomEvent",
...     fields={"text": ""},
... )

Manager

Defined in xtalk.serving.interfaces.

class Manager(EventListenerMixin, ShutdownMixin)

Base class for Xtalk managers.

Notes

Subclasses typically accept event_bus, session_id, pipeline, and config arguments, then register handlers with @Manager.event_handler.

Methods

event_handler

Defined in xtalk.serving.interfaces.

def event_handler(event_type: Type[BaseEvent], *, priority: int = 0, enabled_if: Callable[['EventListenerMixin'], bool] | None = None)

Declare a manager event handler.

Parameters
  • event_type (Type[BaseEvent]) Event class handled by the decorated method.
  • priority (int, optional) Execution priority for the handler. Higher values run first.
  • enabled_if (Callable[[EventListenerMixin], bool] | None, optional) Predicate evaluated against the manager instance before the handler is registered.
Returns
  • Callable Decorator that marks the method for automatic subscription.

EventBus

Defined in xtalk.serving.event_bus.

class EventBus

Publish and subscribe session events with async dispatch support.

Parameters

  • enable_history (bool, optional) Whether to store published events in memory for later inspection.
  • max_history (int, optional) Maximum number of events kept when history is enabled.

Class Fields

  • MAX_ERROR_EVENT_DEPTH = 3
  • ERROR_EVENT_COOLDOWN = 1.0
  • ERROR_EVENT_RATE_LIMIT = 10

Methods

init

Defined in xtalk.serving.event_bus.

def __init__(self, enable_history: bool = False, max_history: int = 1000)

Initialize the event bus.

Parameters
  • enable_history (bool, optional) Whether to record published events in the in-memory history buffer.
  • max_history (int, optional) Maximum number of events retained in history.

subscribe

Defined in xtalk.serving.event_bus.

def subscribe(self, event_class: Union[Type[BaseEvent], str], handler: Callable[[BaseEvent], Any], priority: int = 0) -> None

Subscribe a handler to an event type.

Parameters
  • event_class (Type[BaseEvent] | str) Event class or event type string such as "tts.started".
  • handler (Callable[[BaseEvent], Any]) Sync or async callable invoked for matching events.
  • priority (int, optional) Higher values run earlier.

unsubscribe

Defined in xtalk.serving.event_bus.

def unsubscribe(self, event_class: Union[Type[BaseEvent], str], handler: Callable) -> bool

Unsubscribe a handler from an event type.

Parameters
  • event_class (Type[BaseEvent] | str) Event class or event type string.
  • handler (Callable) Previously subscribed handler to remove.
Returns
  • bool True if the handler was removed, otherwise False.

publish

Defined in xtalk.serving.event_bus.

async def publish(self, event: BaseEvent, wait_for_completion: bool = False) -> bool

Publish an event to all matching handlers.

Parameters
  • event (BaseEvent) Event instance to dispatch.
  • wait_for_completion (bool, optional) Whether to await handler completion before returning.
Returns
  • bool True on success, otherwise False.

get_history

Defined in xtalk.serving.event_bus.

def get_history(self, event_type: Optional[str] = None, session_id: Optional[str] = None) -> List[BaseEvent]

Retrieve event history with optional filters.

Args: event_type: filter by type session_id: filter by session id

Returns: List of events

get_stats

Defined in xtalk.serving.event_bus.

def get_stats(self) -> Dict[str, Any]

Return current event bus statistics.

Returns: Dict of stats

clear_history

Defined in xtalk.serving.event_bus.

def clear_history(self) -> None

Clear event history.

reset_error_tracking

Defined in xtalk.serving.event_bus.

def reset_error_tracking(self) -> None

Reset error event tracking (useful for testing).

shutdown

Defined in xtalk.serving.event_bus.

async def shutdown(self) -> None

Shut down the event bus and release resources.