上云无忧 > 文档中心 > 天翼云数据仓库服务使用教程 - 使用Python第三方库psycopg2连接集群
数据仓库服务
天翼云数据仓库服务使用教程 - 使用Python第三方库psycopg2连接集群

文档简介:
本章节主要介绍如何使用Python第三方库psycopg2连接集群。 用户在创建好数据仓库集群后使用psycopg2第三方库连接到集群,则可以使用Python访问DWS ,并进行数据表的各类操作。
*产品来源:中国电信天翼云。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

用户在创建好数据仓库集群后使用psycopg2第三方库连接到集群,则可以使用Python访问DWS ,并进行数据表的各类操作。

连接集群前的准备

  • DWS 集群已绑定弹性IP。
  • 已获取DWS 集群的数据库管理员用户名和密码。
注意

由于MD5算法已经被证实存在碰撞可能,已严禁将之用于密码校验算法。当前DWS 采用默认安全设计,默认禁止MD5算法的密码校验,可能导致开源客户端无法正常连接的问题。建议先检查一下数据库参数password_encryption_type参数是否为1,如果取值不为1,需要修改,修改方法参见《用户指南》的“[修改数据库参数](https://www.ctyun.cn/document/10014061/10047788)”章节;然后修改一次准备使用的数据库用户的密码。

说明
当前DWS出于安全考虑,已经默认不再使用MD5存储密码摘要了,这将导致使用开源驱动或者客户端无法正常连接数据库。需要您调整一下密码策略后再创建一个新用户或者对老用户做一次密码修改,方可使用开源协议中的MD5认证算法。

数据库中是不会存储用户的密码原文,而是存储密码的HASH摘要,在密码校验时与客户端发来的密码摘要进行比对(中间会有加盐操作)。故当您改变了密码算法策略时,数据库也是无法还原您的密码,再生成新的HASH算法的摘要值的。必须您手动修改一次密码或者创建一个新用户,这时新的密码将会采用您设置的HASH算法进行摘要存储,用于下次连接认证。

  • 已获取DWS 集群的公网访问地址,含IP地址和端口。具体请参见 [获取集群连接地址](https://www.ctyun.cn/document/10014061/10047782)。

  • 已安装psycopg2第三方库。下载地址:[https://pypi.org/project/psycopg2/](https://pypi.org/project/psycopg2/ " "),安装部署操作请参见:[https://www.psycopg.org/install/](https://www.psycopg.org/install/ " ")。

说明
  • CentOS、Redhat等操作系统中使用yum命令安装,命令为:yum install python-psycopg2。

  • psycopg2的使用依赖于PostgreSQL的libpq动态库(32位的psycopg2需要对应32位的libpq;64位的psycopg2对应64位的libpq),Linux中可以依赖yum命令解决。在Windows系统使用psycopg2需要先安装libpq,主要方式有两种:

  • 安装PostgreSQL,并配置libpq、ssl、crypto动态库位置到环境变量PATH中。

  • 安装psqlodbc,使用PostgreSQL ODBC驱动携带的libpq、ssl、crypto动态库。

使用约束

由于psycopg2是基于PostgreSQL的客户端接口,它的功能DWS并不能完全支持。具体支持情况请见下表。

说明
以下接口支持情况是基于Python 3.8.5及psycopg 2.9.1版本。

DWS对psycopg2主要接口支持情况

类名 功能描述 函数/成员变量 支持 备注
connections basic

cursor(name=None,cursor_factory=None

,scrollable=None,withhold=False)

Y -
connections basic commit() Y -
connections basic rollback() Y -
connections basic close() Y -
connections Two-phase commit support methods xid(format_id,gtrid,bqual) Y -
connections Two-phase commit support methods tpc_begin(xid) Y -
connections Two-phase commit support methods tpc_prepare() N 内核不支持显式prepare transaction
connections Two-phase commit support methods tpc_commit([xid]) Y -
connections Two-phase commit support methods tpc_rollback([xid]) Y -
connections Two-phase commit support methods tpc_recover() Y -
connections Two-phase commit support methods closed Y -
connections Two-phase commit support methods cancel() Y -
connections Two-phase commit support methods reset() N 不支持DISCARD ALL
connections Two-phase commit support methods dsn Y -
connections Transaction control methods and attributes.

set_session(isolation_level=None

,readonly=None,deferrable=None,autocommit=None)

Y 数据库不支持session中设置default_transaction_read_only
connections Transaction control methods and attributes. autocommit Y -
connections Transaction control methods and attributes. isolation_level Y -
connections Transaction control methods and attributes. readonly N 数据库不支持session中设置default_transaction_read_only
connections Transaction control methods and attributes. deferrable Y -
connections Transaction control methods and attributes. set_isolation_level(level) Y -
connections Transaction control methods and attributes. encoding Y -
connections Transaction control methods and attributes. set_client_encoding(enc) Y -
connections Transaction control methods and attributes. notices N 数据库不支持listen/notify
connections Transaction control methods and attributes. notifies Y -
connections Transaction control methods and attributes. cursor_factory Y -
connections Transaction control methods and attributes. info Y -
connections Transaction control methods and attributes. status Y -
connections Transaction control methods and attributes. lobject N 数据库不支持大对象相关操作
connections Methods related to asynchronous support poll() Y -
connections fileno() Y -
connections isexecuting() Y -
connections Interoperation with other C API modules pgconn_ptr Y -
connections get_native_connection() Y -
connections informative methods of the native connection get_transaction_status() Y -
connections informative methods of the native connection protocol_version Y -
connections informative methods of the native connection server_version Y -
connections informative methods of the native connection get_backend_pid() Y 获取到的不是后台的pid,是逻辑连接的id号
connections informative methods of the native connection get_parameter_status(parameter) Y -
connections informative methods of the native connection get_dsn_parameters() Y -
cursor basic description Y -
cursor basic close() Y -
cursor basic closed Y -
cursor basic connection Y -
cursor basic name Y -
cursor basic scrollable N 数据库不支持SCROLL CURSOR
cursor basic withhold N withhold cursor在commit前需要关闭
cursor Commands execution methods execute(query,vars=None) Y -
cursor Commands execution methods executemany(query,vars_list) Y -
cursor Commands execution methods callproc(procname[,parameters]) Y -
cursor Commands execution methods mogrify(operation[,parameters]) Y -
cursor Commands execution methods setinputsizes(sizes) Y -
cursor Commands execution methods fetchone() Y -
cursor Commands execution methods fetchmany([size=cursor.arraysize]) Y -
cursor Commands execution methods fetchall() Y -
cursor Commands execution methods scroll(value[,mode='relative']) N 数据库不支持SCROLL CURSOR
cursor Commands execution methods arraysize Y -
cursor Commands execution methods itersize Y -
cursor Commands execution methods rowcount Y -
cursor Commands execution methods rownumber Y -
cursor Commands execution methods lastrowid Y -
cursor Commands execution methods query Y -
cursor Commands execution methods statusmessage Y -
cursor Commands execution methods cast(oid,s) Y -
cursor Commands execution methods tzinfo_factory Y -
cursor Commands execution methods nextset() Y -
cursor Commands execution methods setoutputsize(size[,column]) Y -
cursor COPY-related methods copy_from(file,table,sep='\t',null='\N',size=8192,columns=None) Y -
cursor COPY-related methods copy_to(file,table,sep='\t',null='\N',columns=None) Y -
cursor COPY-related methods copy_expert(sql,file,size=8192) Y -
cursor Interoperation with other C API modules pgresult_ptr Y -

在Linux环境使用psycopg2第三方库连接集群

1.以root用户登录Linux环境。

2.执行以下命令创建python_dws.py文件。

vi python_dws.py复制

请复制粘贴以下内容放入python_dws.py文件中:

#!/usr/bin/python 
# -*- coding: UTF-8 -*- 
  
from __future__ import print_function 
  
import psycopg2 
  
  
def create_table(connection): 
    print("Begin to create table") 
    try: 
        cursor = connection.cursor() 
        cursor.execute("drop table if exists test;" 
                       "create table test(id int, name text);") 
        connection.commit() 
    except psycopg2.ProgrammingError as e: 
        print(e) 
    else: 
        print("Table created successfully") 
        cursor.close() 
  
  
def insert_data(connection): 
    print("Begin to insert data") 
    try: 
        cursor = connection.cursor() 
        cursor.execute("insert into test values(1,'number1');") 
        cursor.execute("insert into test values(2,'number2');") 
        cursor.execute("insert into test values(3,'number3');") 
        connection.commit() 
    except psycopg2.ProgrammingError as e: 
        print(e) 
    else: 
        print("Insert data successfully") 
        cursor.close() 
  
  
def update_data(connection): 
    print("Begin to update data") 
    try: 
        cursor = connection.cursor() 
        cursor.execute("update test set name = 'numberupdated' where id=1;") 
        connection.commit() 
        print("Total number of rows updated :", cursor.rowcount) 
        cursor.execute("select * from test order by 1;") 
        rows = cursor.fetchall() 
        for row in rows: 
            print("id = ", row[0]) 
            print("name = ", row[1], "\n") 
    except psycopg2.ProgrammingError as e: 
        print(e) 
    else: 
        print("After Update, Operation done successfully") 
  
  
def delete_data(connection): 
    print("Begin to delete data") 
    try: 
        cursor = connection.cursor() 
        cursor.execute("delete from test where id=3;") 
        connection.commit() 
        print("Total number of rows deleted :", cursor.rowcount) 
        cursor.execute("select * from test order by 1;") 
        rows = cursor.fetchall() 
        for row in rows: 
            print("id = ", row[0]) 
            print("name = ", row[1], "\n") 
    except psycopg2.ProgrammingError as e: 
        print(e) 
    else: 
        print("After Delete,Operation done successfully") 
  
  
def select_data(connection): 
    print("Begin to select data") 
    try: 
        cursor = connection.cursor() 
        cursor.execute("select * from test order by 1;") 
        rows = cursor.fetchall() 
        for row in rows: 
            print("id = ", row[0]) 
            print("name = ", row[1], "\n") 
    except psycopg2.ProgrammingError as e: 
        print(e) 
        print("select failed") 
    else: 
        print("Operation done successfully") 
        cursor.close() 
  
  
if __name__ == '__main__': 
    try: 
        conn = psycopg2.connect(host='10.154.70.231', 
                                port='8000', 
                                database='gaussdb',  # 需要连接的database 
                                user='dbadmin', 
                                password='password')  # 数据库用户密码 
    except psycopg2.DatabaseError as ex: 
        print(ex) 
        print("Connect database failed") 
    else: 
        print("Opened database successfully") 
        create_table(conn) 
        insert_data(conn) 
        select_data(conn) 
        update_data(conn) 
        delete_data(conn) 
        conn.close()复制

3.按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。

psycopg2接口不提供重试连接的能力,您需要在业务代码中实现重试处理。

conn = psycopg2.connect(host='10.154.70.231', 
                                port='8000', 
                                database='gaussdb',  # 需要连接的database 
                                user='dbadmin', 
                                password='password')  # 数据库用户密码复制

4.执行以下命令,使用psycopg第三方库连接集群。

python python_dws.py复制

在Windows环境使用psycopg2第三方库连接集群

1.在Windows系统中,单击“开始”按钮 ,在搜索框中,键入 cmd ,然后在结果列表中单击“cmd.exe”打开命令提示符窗口。

2.在命令提示符窗口中,执行以下命令创建python_dws.py文件。

type nul> python_dws.py复制

请复制粘贴以下内容放入python_dws.py文件中:

请复制粘贴以下内容放入python_dws.py文件中:
#!/usr/bin/python 
# -*- coding:UTF-8 -*- 
 
from __future__ import print_function 
 
import psycopg2 
 
 
def create_table(connection): 
    print("Begin to create table") 
    try: 
        cursor = connection.cursor() 
        cursor.execute("drop table if exists test;" 
                       "create table test(id int, name text);") 
        connection.commit() 
    except psycopg2.ProgrammingError as e: 
        print(e) 
    else: 
        print("Table created successfully") 
        cursor.close() 
 
 
def insert_data(connection): 
    print("Begin to insert data") 
    try: 
        cursor = connection.cursor() 
        cursor.execute("insert into test values(1,'number1');") 
        cursor.execute("insert into test values(2,'number2');") 
        cursor.execute("insert into test values(3,'number3');") 
        connection.commit() 
    except psycopg2.ProgrammingError as e: 
        print(e) 
    else: 
        print("Insert data successfully") 
        cursor.close() 
 
 
def update_data(connection): 
    print("Begin to update data") 
    try: 
        cursor = connection.cursor() 
        cursor.execute("update test set name = 'numberupdated' where id=1;") 
        connection.commit() 
        print("Total number of rows updated :", cursor.rowcount) 
        cursor.execute("select * from test order by 1;") 
        rows = cursor.fetchall() 
        for row in rows: 
            print("id = ", row[0]) 
            print("name = ", row[1], "\n") 
    except psycopg2.ProgrammingError as e: 
        print(e) 
    else: 
        print("After Update, Operation done successfully") 
 
 
def delete_data(connection): 
    print("Begin to delete data") 
    try: 
        cursor = connection.cursor() 
        cursor.execute("delete from test where id=3;") 
        connection.commit() 
        print("Total number of rows deleted :", cursor.rowcount) 
        cursor.execute("select * from test order by 1;") 
        rows = cursor.fetchall() 
        for row in rows: 
            print("id = ", row[0]) 
            print("name = ", row[1], "\n") 
    except psycopg2.ProgrammingError as e: 
        print(e) 
    else: 
        print("After Delete,Operation done successfully") 
 
 
def select_data(connection): 
    print("Begin to select data") 
    try: 
        cursor = connection.cursor() 
        cursor.execute("select * from test order by 1;") 
        rows = cursor.fetchall() 
        for row in rows: 
            print("id = ", row[0]) 
            print("name = ", row[1], "\n") 
    except psycopg2.ProgrammingError as e: 
        print(e) 
        print("select failed") 
    else: 
        print("Operation done successfully") 
        cursor.close() 
 
 
if __name__ == '__main__': 
    try: 
        conn = psycopg2.connect(host='10.154.70.231', 
                                port='8000', 
                                database='postgresgaussdb',  # 需要连接的database 
                                user='dbadmin', 
                                password='password')  # 数据库用户密码 
    except psycopg2.DatabaseError as ex: 
        print(ex) 
        print("Connect database failed") 
    else: 
        print("Opened database successfully") 
        create_table(conn) 
        insert_data(conn) 
        select_data(conn) 
        update_data(conn) 
        delete_data(conn) 
        conn.close()复制

3.按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。

conn = psycopg2.connect(host='10.154.70.231', 
                                port='8000', 
                                database='gaussdb',  # 需要连接的database 
                                user='dbadmin', 
                                password='password')  # 数据库用户密码复制

4.在命令提示符窗口中,执行以下命令,使用psycopg第三方库连接集群。

python python_dws.py复制

psycopg2连接集群不支持CN Retry特性的问题说明

DWS支持在SQL语句执行出错时的自动重试功能(简称CN Retry)。CN Retry对于客户端和驱动发送的SQL语句在执行失败时可以自动识别错误类型,并进行重试。但使用psycopg2默认连接方式创建的连接在语句执行失败时没有自动重试,会直接报错退出。如常见的主备切换场景下,未自动重试会报如下错误,但在自动重试期间完成主备切换,则会返回正确结果。

psycopg2.errors.ConnectionFailure: pooler: failed to create 1 connections, Error Message: remote node dn_6003_6004,
 detail: could not connect to server: Operation now in progress复制

报错原因:

1.psycopg2在发送SQL语句前先发送了BEGIN语句开启事务。

2.CN Retry不支持事务块中的语句是特性约束。

解决方案:

  • 在同步方式连接时,可以通过主动结束驱动开启的事务。
cursor = conn.cursor() 
# 增加end语句主动结束驱动开启的事务 
cursor.execute("end; select * from test order by 1;")  
rows = cursor.fetchall()复制
  • 使用异步连接方式主动开启事务,异步连接介绍具体请参见pyscopg官网:https://www.psycopg.org/docs/advanced.html?highlight=async。
#!/usr/bin/env python3 
# _*_ encoding=utf-8 _*_ 
  
import psycopg2 
import select 
  
# psycopg2官方提供的异步连接方式时的wait函数 
# 详见https://www.psycopg.org/docs/advanced.html?highlight=async 
def wait(conn): 
    while True: 
        state = conn.poll() 
        if state == psycopg2.extensions.POLL_OK: 
            break 
        elif state == psycopg2.extensions.POLL_WRITE: 
            select.select([], [conn.fileno()], []) 
        elif state == psycopg2.extensions.POLL_READ: 
            select.select([conn.fileno()], [], []) 
        else: 
            raise psycopg2.OperationalError("poll() returned %s" % state) 
  
def psycopg2_cnretry_sync(): 
    # 创建连接 
    conn = psycopg2.connect(host='10.154.70.231', 
                                port='8000', 
                                database='gaussdb',  # 需要连接的database 
                                user='dbadmin', 
                                password='password',  # 数据库用户密码 
                                async=1) # 使用异步方式连接 
    wait(conn) 
  
    # 执行查询 
    cursor = conn.cursor() 
    cursor.execute("select * from test order by 1;") 
    wait(conn) 
    rows = cursor.fetchall() 
    for row in rows: 
        print(row[0], row[1]) 
  
    # 关闭连接 
    conn.close() 
  
if __name__ == '__main__': 
    psycopg2_cnretry_async()
相似文档
  • 本章节主要介绍如何使用Python第三方库PyGreSQL连接集群。 用户在创建好数据仓库集群后使用PyGreSQL第三方库连接到集群,则可以使用Python访问DWS ,并进行数据表的各类操作。
  • 本章节主要介绍如何管理数据库连接。 操作场景: 数据库默认支持一定数量的连接,管理员用户可以通过管理数据库的连接,了解当前数据库的连接性能,或增加连接限制使更多用户或应用程序可以同时连接到数据库。
  • 本章节主要介绍如何修改数据库参数。 集群创建成功后,用户可以根据实际需要修改集群的数据库参数。在DWS 管理控制台,您可以查看或设置一些常用的数据库参数,详情请参见操作步骤。如需查看或设置其他数据库参数,您可以通过SQL命令的方式,详情请参见《数据仓库服务数据库开发指南》中的“配置GUC参数”章节。
  • 本章节主要介绍如何变更集群规格。 相比于扩容节点而言,变更规格功能更适合阶段性峰值或只对计算能力变化有诉求的业务场景,在业务峰值来临之前您可以通过变更规格快速提升集群计算能力,在业务峰值过后再快速的将集群配置降低,做到最大程度的节约成本。
  • 本章节主要介绍如何查看集群状态。 在DWS 管理控制台的“集群管理”页面,用户可以在集群列表中查看集群的概要信息,例如集群状态、任务信息、节点规格和近期事件等信息。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部