上云无忧 > 文档中心 > 天翼云分布式消息服务RabbitMQ使用C\C++代码示例
分布式消息服务RabbitMQ
天翼云分布式消息服务RabbitMQ使用C\C++代码示例

文档简介:
C\C++API 创建连接 char const *hostname; int port, status; amqp_socket_t *socket; amqp_connection_state_t conn; conn = amqp_new_connection(); socket = amqp_ssl_socket_new(conn); amqp_ssl_socket_set_verify_peer(socket, 0); amqp_ssl_socket_set_verify_hostname(socket, 0);
*产品来源:中国电信天翼云。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

C\C++API

创建连接

  char const *hostname;
  int port, status;
  amqp_socket_t *socket;
  amqp_connection_state_t conn;

  conn = amqp_new_connection();

  socket = amqp_ssl_socket_new(conn);

  amqp_ssl_socket_set_verify_peer(socket, 0);
  amqp_ssl_socket_set_verify_hostname(socket, 0);

  status = amqp_socket_open(socket, hostname, port);

  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest",


关闭连接

  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");


创建信道

  amqp_channel_open(conn, 1);
  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");


关闭信道

  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");


生产消息


  {
    amqp_basic_properties_t props;
    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
    props.content_type = amqp_cstring_bytes("text/plain");
    props.delivery_mode = 2; /* persistent delivery mode */
    die_on_error(amqp_basic_publish(conn,
                                    1,
                                    amqp_cstring_bytes(exchange),
                                    amqp_cstring_bytes(routingkey),
                                    0,
                                    0,
                                    &props,
                                    amqp_cstring_bytes(messagebody)),
                 "Publishing");
  }


消费消息

  {
    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
                                 amqp_empty_table);
    die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
    queuename = amqp_bytes_malloc_dup(r->queue);
    if (queuename.bytes == NULL) {
      fprintf(stderr, "Out of memory while copying queue name");
      return 1;
    }
  }


使用示例

#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include <stdint.h>
#include <amqp_ssl_socket.h>
#include <amqp_framing.h>

#include "utils.h"

int main(int argc, char const *const *argv)
{
  char const *hostname;
  int port, status;
  char const *exchange;
  char const *routingkey;
  char const *messagebody;
  amqp_socket_t *socket;
  amqp_connection_state_t conn;

  if (argc < 6) {
    fprintf(stderr, "Usage: amqps_sendstring host port exchange routingkey "
            "messagebody [cacert.pem [verifypeer] [verifyhostname] "
            "[key.pem cert.pem]]\n");
    return 1;
  }

  hostname = argv[1];
  port = atoi(argv[2]);
  exchange = argv[3];
  routingkey = argv[4];
  messagebody = argv[5];

  conn = amqp_new_connection();

  socket = amqp_ssl_socket_new(conn);
  if (!socket) {
    die("creating SSL/TLS socket");
  }

  amqp_ssl_socket_set_verify_peer(socket, 0);
  amqp_ssl_socket_set_verify_hostname(socket, 0);

  if (argc > 6) {
    int nextarg = 7;
    status = amqp_ssl_socket_set_cacert(socket, argv[6]);
    if (status) {
      die("setting CA certificate");
    }
    if (argc > nextarg && !strcmp("verifypeer", argv[nextarg])) {
      amqp_ssl_socket_set_verify_peer(socket, 1);
      nextarg++;
    }
    if (argc > nextarg && !strcmp("verifyhostname", argv[nextarg])) {
      amqp_ssl_socket_set_verify_hostname(socket, 1);
      nextarg++;
    }
    if (argc > nextarg + 1) {
      status =
          amqp_ssl_socket_set_key(socket, argv[nextarg + 1], argv[nextarg]);
      if (status) {
        die("setting client cert");
      }
    }
  }

  status = amqp_socket_open(socket, hostname, port);
  if (status) {
    die("opening SSL/TLS connection");
  }

  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
                    "Logging in");
  amqp_channel_open(conn, 1);
  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

  {
    amqp_basic_properties_t props;
    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
    props.content_type = amqp_cstring_bytes("text/plain");
    props.delivery_mode = 2; /* persistent delivery mode */
    die_on_error(amqp_basic_publish(conn,
                                    1,
                                    amqp_cstring_bytes(exchange),
                                    amqp_cstring_bytes(routingkey),
                                    0,
                                    0,
                                    &props,
                                    amqp_cstring_bytes(messagebody)),
                 "Publishing");
  }

  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
  die_on_error(amqp_destroy_connection(conn), "Ending connection");
  return 0;
}

相似文档
  • 购买实例 登录管理控制台。 进入RabbitMQ管理控制台。 在管理控制台右上角单击“地域名称”,选择区域。 此处请选择与您的应用服务相同的区域。 点击“购买实例”跳转到购买页面。
  • 查看实例 登录管理控制台。 进入RabbitMQ管理控制台。 当前页面会列出所购买的RabbitMQ实例,并查看状态,状态说明如下
  • 创建和删除虚拟主机 Vhost 虚拟主机(Virtual Host),类似于 Namespace 命名空间的概念,逻辑隔离,每个用户里可以创建多个 Vhost,每个 Vhost 可以创建若干个 Exchange 和 Queue。 1.登录管理控制台。 2.进入RabbitMQ管理控制台。
  • 创建、修改和删除用户 1.登录管理控制台。 2.进入RabbitMQ管理控制台。 3.在实例列表页在操作列,目标实例行点击“管理”。 4.点击“集群管理”后点击“用户”到达用户管理页面,点击“新建”按钮。
  • 创建、修改和删除用户策略 1.登录管理控制台。 2.进入RabbitMQ管理控制台。 3.在实例列表页在操作列,目标实例行点击“管理”。 4.点击“集群管理”后点击“用户策略”到达用户策略管理页面,点击“新建”按钮。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部