Skip to content

Instantly share code, notes, and snippets.

@Gsantomaggio
Created March 3, 2025 13:32
Show Gist options
  • Select an option

  • Save Gsantomaggio/3baff80b8b74ce7a41719698099ed34f to your computer and use it in GitHub Desktop.

Select an option

Save Gsantomaggio/3baff80b8b74ce7a41719698099ed34f to your computer and use it in GitHub Desktop.

Revisions

  1. Gsantomaggio created this gist Mar 3, 2025.
    93 changes: 93 additions & 0 deletions ex.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,93 @@
    # type: ignore


    from rabbitmq_amqp_python_client import ( # PosixSSlConfigurationContext,; PosixClientCert,
    AddressHelper,
    AMQPMessagingHandler,
    Connection,
    Environment,
    Event,
    ExchangeSpecification,
    ExchangeToQueueBindingSpecification,
    Message,
    OutcomeState,
    QuorumQueueSpecification,
    )

    MESSAGES_TO_PUBLISH = 100


    def create_connection(environment: Environment) -> Connection:
    connection = environment.connection()
    connection.dial()
    return connection


    def main() -> None:
    exchange_name = "orders"
    queue_goods_warehouse = "goods_warehouse"
    queue_sales_office = "sales_office"
    routing_key = "#"

    print("connection to amqp server")
    environment = Environment(uri="amqp://guest:guest@localhost:5672/")
    connection = create_connection(environment)

    management = connection.management()

    print("declaring exchange and queue")
    management.declare_exchange(ExchangeSpecification(name=exchange_name))

    management.declare_queue(
    QuorumQueueSpecification(name=queue_goods_warehouse)
    # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
    )

    management.declare_queue(
    QuorumQueueSpecification(name=queue_sales_office)
    # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
    )

    print("binding queue to exchange")
    bind_name = management.bind(
    ExchangeToQueueBindingSpecification(
    source_exchange=exchange_name,
    destination_queue=queue_goods_warehouse,
    binding_key=routing_key,
    )
    )

    bind_name = management.bind(
    ExchangeToQueueBindingSpecification(
    source_exchange=exchange_name,
    destination_queue=queue_sales_office,
    binding_key=routing_key,
    )
    )

    addr = AddressHelper.exchange_address(exchange_name, routing_key)


    print("create a publisher and publish a test message")
    publisher = connection.publisher(addr)

    print("purging the queue")


    # publish 10 messages
    for i in range(MESSAGES_TO_PUBLISH):
    print("publishing")
    status = publisher.publish(Message(body="test"))
    if status.remote_state == OutcomeState.ACCEPTED:
    print("message accepted")
    elif status.remote_state == OutcomeState.RELEASED:
    print("message not routed")
    elif status.remote_state == OutcomeState.REJECTED:
    print("message not rejected")

    publisher.close()



    if __name__ == "__main__":
    main()