AMQP 0.9.1 — Spec Coverage
Spec: AMQP 0-9-1 — Protocol Specification → (AMQP WG, RabbitMQ-hosted) + the AMQP 0-9-1 Complete Reference Guide.
Context: crates/amqp-0-9-1 implements AMQP 0.9.1 as a standalone
protocol (class/method framing, typed field tables, broker model) — separate
from AMQP 1.0 (crates/amqp-bridge + crates/amqp-endpoint). It covers every
class a complete producer/consumer needs: connection, channel, exchange, queue,
basic, confirm (publisher confirms) and tx (transactions). Each item is backed
by a repo path and a test; interop is proven live against RabbitMQ 4.0 plus
cross-protocol against qpid-proton (AMQP 1.0).
Implementation:
crates/amqp-0-9-1/· docs.rs — class/method framing, typed field tables, content properties, broker model; 25 tests green (live against RabbitMQ 4.0).
§4.2 Wire types
§4.2.5 Base types + typed field tables
Spec: §4.2.5 — integers big-endian; shortstr = 1-byte length + bytes
(max 255); longstr = 4-byte length + bytes; bit packed into octets;
field-table (§4.2.5.5) = a longstr-framed list of (shortstr name, typed value) pairs with value types t/b/B/s/u/I/i/l/f/d/D/S/T/F/A/x/V.
Repo: crates/amqp-0-9-1/src/types.rs::{Reader, Writer} — u8/u16/u32/u64
+ signed (i8/i16/i32/i64) + float (f32/f64) big-endian, short_str
(255 enforcement), long_str, pack_bits. Full field-table codec: the
FieldValue enum (all value types incl. nested Table + Array + Decimal
+ Timestamp), Writer::{field_value, field_table} (encode),
Reader::{field_value, field_table} (decode). field_table_strs remains an
S-only fast path for the client-properties advertised in start-ok.
Tests: types::tests::{integer_roundtrip_big_endian, shortstr_longstr_roundtrip, shortstr_rejects_over_255, field_table_strs_then_skip, bit_packing, field_table_typed_roundtrip, unknown_field_type_rejected}.
Status: done
§4.2.2 Frame format
§4.2.2 Frame (type/channel/size/payload/0xCE) + protocol header
Spec: §4.2.2 — protocol header AMQP\x00\x00\x09\x01; each frame is
[type:octet][channel:short][size:long][payload][frame-end:0xCE]; frame types
METHOD(1)/HEADER(2)/BODY(3)/HEARTBEAT(8).
Repo: crates/amqp-0-9-1/src/frame.rs::{Frame, FrameType, PROTOCOL_HEADER, FRAME_END} — encode/decode (with truncation detection +
frame-end validation).
Tests: frame::tests::{frame_roundtrips, decode_reports_truncation, protocol_header_is_0_0_9_1}.
Status: done
§1.4 connection class
§1.4 connection (start/start-ok/tune/tune-ok/open/open-ok/close)
Spec: §1.4 + §4.2.7 — handshake: server connection.start → client
start-ok (client-properties, mechanism, response, locale) → server tune →
client tune-ok (channel-max/frame-max/heartbeat negotiated) → client open
(vhost) → server open-ok; close/close-ok.
Repo: crates/amqp-0-9-1/src/method.rs::{connection_start_ok, connection_tune_ok, connection_open, connection_tune_params} +
crates/amqp-0-9-1/src/client.rs::Amqp091Client::{connect, close} (drives the
full handshake with PLAIN auth).
Tests: method::tests::{start_ok_carries_plain_response, tune_ok_roundtrips_params}; live tests/rabbitmq_amqp091_e2e.rs:: zerodds_amqp091_self_roundtrip.
Status: done
Heartbeat negotiation (§4.2.7):
tune-oknegotiates the values and the synchronous client selectsheartbeat=0(disabled). This is spec-compliant — §4.2.7 explicitly allows 0, and a blocking client correctly advertises no heartbeat interval it would not service.
§1.5 channel class
§1.5 channel.open/open-ok + flow/flow-ok + close/close-ok
Spec: §1.5 — channel.open/open-ok; channel.flow/flow-ok (producer
throttling); channel.close/close-ok (close the channel, the connection
stays open).
Repo: crates/amqp-0-9-1/src/method.rs::{channel_open, channel_flow, channel_flow_ok, channel_close, channel_close_ok} + client.rs::Amqp091Client:: {channel_flow, channel_close} (connect opens channel 1).
Tests: method::tests::channel_close_and_flow_ids; live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_channel_flow_and_close.
Status: done
§1.6 exchange class
§1.6 exchange.declare/declare-ok + delete/delete-ok
Spec: §1.6 — exchange.declare (type direct/fanout/topic/headers, durable,
…); exchange.delete.
Repo: crates/amqp-0-9-1/src/method.rs::{exchange_declare, exchange_delete}
+ client.rs::Amqp091Client::{exchange_declare, exchange_delete}.
Tests: method::tests::exchange_declare_carries_type; live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_exchange_routing (topic
exchange + routing into a bound queue).
Status: done
§1.7 queue class
§1.7 queue.declare/bind/unbind/purge/delete
Spec: §1.7 — queue.declare/declare-ok; queue.bind/unbind (exchange
bindings); queue.purge (+message-count); queue.delete (+message-count).
Repo: crates/amqp-0-9-1/src/method.rs::{queue_declare, queue_bind, queue_unbind, queue_purge, queue_delete, queue_declare_ok_name, queue_op_ok_message_count} + client.rs::Amqp091Client::{queue_declare, queue_bind, queue_unbind, queue_purge, queue_delete}.
Tests: method::tests::{queue_declare_and_publish_ids, queue_bind_unbind_purge_delete_ids, queue_op_ok_count_parsed}; live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_exchange_routing.
Status: done
§1.8 basic class
§1.8 basic.publish + content (header/body) + content properties
Spec: §1.8 + §4.2.6 + §1.8.1 — basic.publish followed by a content header
frame (class-id, weight, body-size, property-flags + properties) + body frame(s).
Properties: content-type/-encoding, headers (field table), delivery-mode,
priority, correlation-id, reply-to, expiration, message-id, timestamp, type,
user-id, app-id.
Repo: crates/amqp-0-9-1/src/method.rs::{basic_publish, content_header, content_header_with_props, content_header_body_size, parse_content_header, ContentProperties} + client.rs::Amqp091Client::{publish, publish_with_props}.
Tests: method::tests::{content_header_roundtrip, content_properties_roundtrip, content_header_no_props_matches_legacy}; live tests/rabbitmq_amqp091_e2e.rs:: {zerodds_amqp091_self_roundtrip, zerodds_amqp091_publish_consumed_by_proton_10, zerodds_amqp091_content_properties} (persistent delivery-mode + typed headers
round-trip through RabbitMQ).
Status: done
§1.8 basic.get/get-ok/get-empty + basic.ack + basic.reject/nack
Spec: §1.8 — synchronous pull: basic.get → get-ok (delivery-tag, …) +
content, or get-empty; basic.ack; basic.reject (+requeue); basic.nack
(RabbitMQ, +multiple/requeue).
Repo: crates/amqp-0-9-1/src/method.rs::{basic_get, basic_get_ok_delivery_tag, basic_ack, basic_ack_delivery_tag, basic_reject, basic_nack} + client.rs::Amqp091Client::{get, get_with_props, get_reject}.
Tests: method::tests::{basic_consume_cancel_qos_ids, ack_delivery_tag_parsed};
live tests/rabbitmq_amqp091_e2e.rs::{zerodds_amqp091_consumes_pika_published, zerodds_amqp091_reject_requeue} + cross-stack
amqp-endpoint/tests/rabbitmq_cross_stack_e2e.rs.
Status: done
§1.8 basic.qos + async consume/deliver/cancel
Spec: §1.8 — basic.qos (prefetch window); basic.consume/consume-ok
(subscription) → server-pushed basic.deliver + content → basic.ack;
basic.cancel/cancel-ok.
Repo: crates/amqp-0-9-1/src/method.rs::{basic_qos, basic_consume, basic_cancel, basic_consume_ok_tag, basic_deliver_delivery_tag} +
client.rs::Amqp091Client::{qos, consume_one}.
Tests: method::tests::{basic_consume_cancel_qos_ids, basic_deliver_tag_skips_consumer_tag, consume_ok_tag_parsed}; live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_async_consume.
Status: done
confirm class (RabbitMQ extension, class 85)
confirm.select/select-ok + publisher confirms
Spec: RabbitMQ confirm.select — put the channel into confirm mode; each
published message is confirmed by the broker with a server-pushed basic.ack
(delivery-tag).
Repo: crates/amqp-0-9-1/src/method.rs::{confirm_select, basic_ack_delivery_tag} + client.rs::Amqp091Client::{confirm_select, publish_confirmed}.
Tests: method::tests::confirm_and_tx_ids; live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_publisher_confirms.
Status: done
§1.9 tx class
§1.9 tx.select/commit/rollback
Spec: §1.9 — tx.select (start a transaction), tx.commit, tx.rollback.
Repo: crates/amqp-0-9-1/src/method.rs::{tx_select, tx_commit, tx_rollback}
+ client.rs::Amqp091Client::{tx_select, tx_commit, tx_rollback}.
Tests: method::tests::confirm_and_tx_ids; live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_transactions (rollback does not
deliver, commit does).
Status: done
Audit status
11 done / 0 partial / 0 open / 0 n/a (informative) / 0 n/a (rejected).
Test run: cargo test -p zerodds-amqp-0-9-1 --lib — 25 tests green, 0 failed.
RabbitMQ interop additionally live (codepit, AMQP_RABBITMQ=1):
cargo test -p zerodds-amqp-0-9-1 --test rabbitmq_amqp091_e2e -- --ignored —
10 tests green; cross-stack cargo test -p zerodds-amqp-endpoint --test rabbitmq_cross_stack_e2e -- --ignored — 2 tests green.
AMQP 0.9.1 — Spec-Coverage
Spec: AMQP 0-9-1 — Protocol Specification → (AMQP WG, RabbitMQ-gehostet) + AMQP 0-9-1 Complete Reference Guide.
Scope-Hinweis: crates/amqp-0-9-1 implementiert AMQP 0.9.1 als
eigenständiges Protokoll (class/method-Framing, typisierte Field-Tables,
Broker-Modell) — getrennt von AMQP 1.0 (crates/amqp-bridge +
crates/amqp-endpoint). Abgedeckt sind alle Klassen, die ein vollständiger
Producer/Consumer braucht: connection, channel, exchange, queue, basic, confirm
(Publisher-Confirms) und tx (Transaktionen). Jedes Item ist mit Repo-Pfad und
Test belegt; die Interop ist live gegen RabbitMQ 4.0 + Cross-Protocol gegen
qpid-proton (AMQP 1.0) bewiesen.
Implementation:
crates/amqp-0-9-1/· docs.rs — class/method-Framing, typisierte Field-Tables, Content-Properties, Broker-Modell; 25 Tests grün (live gegen RabbitMQ 4.0).
§4.2 Wire-Typen
§4.2.5 Basis-Typen + typisierte Field-Tables
Spec: §4.2.5 — Integer big-endian; shortstr = 1-Byte-Länge + Bytes
(max 255); longstr = 4-Byte-Länge + Bytes; bit gepackt in Octets;
field-table (§4.2.5.5) = longstr-gerahmte (shortstr name, typed value)-Liste
mit Werttypen t/b/B/s/u/I/i/l/f/d/D/S/T/F/A/x/V.
Repo: crates/amqp-0-9-1/src/types.rs::{Reader, Writer} — u8/u16/u32/u64
+ signed (i8/i16/i32/i64) + float (f32/f64) big-endian, short_str
(255-Enforcement), long_str, pack_bits. Voller Field-Table-Codec:
FieldValue-Enum (alle Werttypen inkl. nested Table + Array + Decimal +
Timestamp), Writer::{field_value, field_table} (encode),
Reader::{field_value, field_table} (decode). field_table_strs bleibt als
S-only-Schnellpfad für client-properties in start-ok.
Tests: types::tests::{integer_roundtrip_big_endian, shortstr_longstr_roundtrip, shortstr_rejects_over_255, field_table_strs_then_skip, bit_packing, field_table_typed_roundtrip, unknown_field_type_rejected}.
Status: done
§4.2.2 Frame-Format
§4.2.2 Frame (type/channel/size/payload/0xCE) + Protokoll-Header
Spec: §4.2.2 — Protokoll-Header AMQP\x00\x00\x09\x01; jedes Frame
[type:octet][channel:short][size:long][payload][frame-end:0xCE]; Frame-Typen
METHOD(1)/HEADER(2)/BODY(3)/HEARTBEAT(8).
Repo: crates/amqp-0-9-1/src/frame.rs::{Frame, FrameType, PROTOCOL_HEADER, FRAME_END} — encode/decode (mit Truncation-Erkennung +
frame-end-Validierung).
Tests: frame::tests::{frame_roundtrips, decode_reports_truncation, protocol_header_is_0_0_9_1}.
Status: done
§1.4 connection-Klasse
§1.4 connection (start/start-ok/tune/tune-ok/open/open-ok/close)
Spec: §1.4 + §4.2.7 — Handshake: Server connection.start → Client
start-ok (client-properties, Mechanismus, Response, Locale) → Server tune
→ Client tune-ok (channel-max/frame-max/heartbeat ausgehandelt) → Client
open (vhost) → Server open-ok; close/close-ok.
Repo: crates/amqp-0-9-1/src/method.rs::{connection_start_ok, connection_tune_ok, connection_open, connection_tune_params} +
crates/amqp-0-9-1/src/client.rs::Amqp091Client::{connect, close} (treibt den
vollen Handshake mit PLAIN-Auth).
Tests: method::tests::{start_ok_carries_plain_response, tune_ok_roundtrips_params}; Live tests/rabbitmq_amqp091_e2e.rs:: zerodds_amqp091_self_roundtrip.
Status: done
Heartbeat-Aushandlung (§4.2.7):
tune-okhandelt die Werte aus und der synchrone Client wähltheartbeat=0(deaktiviert). Das ist spec-konform — §4.2.7 erlaubt 0 explizit, und ein blockierender Client annonciert korrekt kein Heartbeat-Intervall, das er nicht bedienen würde.
§1.5 channel-Klasse
§1.5 channel.open/open-ok + flow/flow-ok + close/close-ok
Spec: §1.5 — channel.open/open-ok; channel.flow/flow-ok
(Producer-Drosselung); channel.close/close-ok (Kanal schließen, Connection
bleibt offen).
Repo: crates/amqp-0-9-1/src/method.rs::{channel_open, channel_flow, channel_flow_ok, channel_close, channel_close_ok} + client.rs::Amqp091Client:: {channel_flow, channel_close} (connect öffnet Kanal 1).
Tests: method::tests::channel_close_and_flow_ids; Live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_channel_flow_and_close.
Status: done
§1.6 exchange-Klasse
§1.6 exchange.declare/declare-ok + delete/delete-ok
Spec: §1.6 — exchange.declare (Typ direct/fanout/topic/headers, durable,
…); exchange.delete.
Repo: crates/amqp-0-9-1/src/method.rs::{exchange_declare, exchange_delete}
+ client.rs::Amqp091Client::{exchange_declare, exchange_delete}.
Tests: method::tests::exchange_declare_carries_type; Live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_exchange_routing (topic-
Exchange + Routing in eine gebundene Queue).
Status: done
§1.7 queue-Klasse
§1.7 queue.declare/bind/unbind/purge/delete
Spec: §1.7 — queue.declare/declare-ok; queue.bind/unbind
(Exchange-Bindings); queue.purge (+message-count); queue.delete
(+message-count).
Repo: crates/amqp-0-9-1/src/method.rs::{queue_declare, queue_bind, queue_unbind, queue_purge, queue_delete, queue_declare_ok_name, queue_op_ok_message_count} + client.rs::Amqp091Client::{queue_declare, queue_bind, queue_unbind, queue_purge, queue_delete}.
Tests: method::tests::{queue_declare_and_publish_ids, queue_bind_unbind_purge_delete_ids, queue_op_ok_count_parsed}; Live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_exchange_routing.
Status: done
§1.8 basic-Klasse
§1.8 basic.publish + Content (Header/Body) + Content-Properties
Spec: §1.8 + §4.2.6 + §1.8.1 — basic.publish gefolgt von Content-Header-
Frame (class-id, weight, body-size, property-flags + Properties) + Body-Frame(s).
Properties: content-type/-encoding, headers (Field-Table), delivery-mode,
priority, correlation-id, reply-to, expiration, message-id, timestamp, type,
user-id, app-id.
Repo: crates/amqp-0-9-1/src/method.rs::{basic_publish, content_header, content_header_with_props, content_header_body_size, parse_content_header, ContentProperties} + client.rs::Amqp091Client::{publish, publish_with_props}.
Tests: method::tests::{content_header_roundtrip, content_properties_roundtrip, content_header_no_props_matches_legacy}; Live tests/rabbitmq_amqp091_e2e.rs:: {zerodds_amqp091_self_roundtrip, zerodds_amqp091_publish_consumed_by_proton_10, zerodds_amqp091_content_properties} (persistent delivery-mode + typed headers
round-trip durch RabbitMQ).
Status: done
§1.8 basic.get/get-ok/get-empty + basic.ack + basic.reject/nack
Spec: §1.8 — synchroner Pull: basic.get → get-ok (delivery-tag, …) +
Content, oder get-empty; basic.ack; basic.reject (+requeue); basic.nack
(RabbitMQ, +multiple/requeue).
Repo: crates/amqp-0-9-1/src/method.rs::{basic_get, basic_get_ok_delivery_tag, basic_ack, basic_ack_delivery_tag, basic_reject, basic_nack} + client.rs::Amqp091Client::{get, get_with_props, get_reject}.
Tests: method::tests::{basic_consume_cancel_qos_ids, ack_delivery_tag_parsed};
Live tests/rabbitmq_amqp091_e2e.rs::{zerodds_amqp091_consumes_pika_published, zerodds_amqp091_reject_requeue} + Cross-Stack
amqp-endpoint/tests/rabbitmq_cross_stack_e2e.rs.
Status: done
§1.8 basic.qos + asynchroner consume/deliver/cancel
Spec: §1.8 — basic.qos (Prefetch-Fenster); basic.consume/consume-ok
(Subscription) → server-pushed basic.deliver + Content → basic.ack;
basic.cancel/cancel-ok.
Repo: crates/amqp-0-9-1/src/method.rs::{basic_qos, basic_consume, basic_cancel, basic_consume_ok_tag, basic_deliver_delivery_tag} +
client.rs::Amqp091Client::{qos, consume_one}.
Tests: method::tests::{basic_consume_cancel_qos_ids, basic_deliver_tag_skips_consumer_tag, consume_ok_tag_parsed}; Live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_async_consume.
Status: done
confirm-Klasse (RabbitMQ-Extension, Klasse 85)
confirm.select/select-ok + Publisher-Confirms
Spec: RabbitMQ-confirm.select — Kanal in Confirm-Modus versetzen; jede
publizierte Nachricht wird vom Broker mit server-pushed basic.ack
(delivery-tag) bestätigt.
Repo: crates/amqp-0-9-1/src/method.rs::{confirm_select, basic_ack_delivery_tag} + client.rs::Amqp091Client::{confirm_select, publish_confirmed}.
Tests: method::tests::confirm_and_tx_ids; Live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_publisher_confirms.
Status: done
§1.9 tx-Klasse
§1.9 tx.select/commit/rollback
Spec: §1.9 — tx.select (Transaktion starten), tx.commit, tx.rollback.
Repo: crates/amqp-0-9-1/src/method.rs::{tx_select, tx_commit, tx_rollback}
+ client.rs::Amqp091Client::{tx_select, tx_commit, tx_rollback}.
Tests: method::tests::confirm_and_tx_ids; Live
tests/rabbitmq_amqp091_e2e.rs::zerodds_amqp091_transactions (rollback liefert
nicht, commit liefert).
Status: done
Audit-Status
11 done / 0 partial / 0 open / 0 n/a (informative) / 0 n/a (rejected).
Test-Lauf: cargo test -p zerodds-amqp-0-9-1 --lib — 25 Tests grün, 0 failed.
RabbitMQ-Interop zusätzlich live (codepit, AMQP_RABBITMQ=1):
cargo test -p zerodds-amqp-0-9-1 --test rabbitmq_amqp091_e2e -- --ignored —
10 Tests grün; Cross-Stack cargo test -p zerodds-amqp-endpoint --test rabbitmq_cross_stack_e2e -- --ignored — 2 Tests grün.