博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ubuntu上跑python连接pg,报错 ImportError: No module named psycopg2
阅读量:5922 次
发布时间:2019-06-19

本文共 3716 字,大约阅读时间需要 12 分钟。

ubuntu上跑python连接pg,报错  ImportError: No module named psycopg2
root@pgproxy1:~# python /home/zxw/PGWriterTest_m.py 
Traceback (most recent call last):
  File "/home/zxw/PGWriterTest_m.py", line 4, in <module>
    import psycopg2
ImportError: No module named psycopg2
例如以下安装:
1
root@pgproxy1:~# apt-cache search psycopg2
python-psycopg2 - Python module for PostgreSQL
python-psycopg2-dbg - Python module for PostgreSQL (debug extension)
python-psycopg2-doc - Python module for PostgreSQL (documentation package)
python3-psycopg2 - Python 3 module for PostgreSQL
python3-psycopg2-dbg - Python 3 module for PostgreSQL (debug extension)
2
root@pgproxy1:~# python -V
Python 2.7.3
3
root@pgproxy1:~# apt-get install -y python-psycopg2 python-psycopg2-doc python-psycopg2-dbg
Reading package lists... Done
Building dependency tree       
Reading state information... Done
...
Processing triggers for libc-bin ...
ldconfig deferred processing now taking place
root@pgproxy1:~#
4
脚本例如以下:
# --encoding:utf-8--
import time
import threading
import psycopg2
import Queue
import datetime

该接口相关能够參考其官网:

http://initd.org/psycopg/

class PGWriterTest(threading.Thread):
    """ 初始化 """
    def __init__(self,connstr):
        self.conn = psycopg2.connect(connstr)
        self.cursor = self.conn.cursor()
        
        self.dbnum  = 4
        self.connArray = []
        self.cursorArray = []
        
        
        for i in range(0,self.dbnum):
            #DB port
            dbidstr = '%02d' % (9900 + i)
            if (i == 0  or i == 2 ):
                dstDB = 'host=ip1 user=dbusername password=pwd dbname=dbname port='+ dbidstr
            else:
                dstDB = 'host=ip2 user=dbusername password=pwd dbname=dbname port='+ dbidstr
            print 'connect ' + dstDB
           
            dstConn =  psycopg2.connect(dstDB)
            self.connArray.append(dstConn)
            dstCursor = dstConn.cursor()
            self.cursorArray.append(dstCursor)
        
        # 执行父类的构造函数
        threading.Thread.__init__(self)
    """ 数据写入数据库 """ 
    def read(self,t_id):
        sql = 'SELECT * from get_cont('+str(t_id)+')' 
        self.cursor.execute(sql)
        datalist = []
        for row in self.cursor.fetchall():
            datalist.append(row)
        return datalist    
    
    """ 数据写入数据库 """ 
    def save(self,t_id,data):
        sql = 'SELECT write_cont((%s,character(255) %s,%d,NOW()::timestamp))'
        params = []
        params.append(t_id)
        params.append(data[1])
        params.append(data[2])
        params.append(data[3])
        #print params
        #取db_ins_id
        db_ins_id = 0
        db_ins_id = t_id % self.dbnum
        try:
            print str(datetime.datetime.now()) + " DB " + str(db_ins_id) + " " +  str( t_id ) + " before insert"
            insert_sql = 'SELECT write_cont((%s,character(255) %s,%d,NOW()::timestamp))'
            self.cursorArray[db_ins_id].execute(insert_sql,params)
            self.cursorArray[db_ins_id].execute("COMMIT")
            print str(datetime.datetime.now()) + " DB " + str(db_ins_id) + " " +  str( t_id ) + " inserted"
            return True
        except Exception,ex:
            print("save error:%s" % str(ex))
            print("save t_id:%s\t" % str(t_id))
            print("error cont:%s" % str(params))
            #self.log.write("error param:%s" % str(params))
            #self.log.write("error t_id:%s" % data.get('t_id',''))
            self.cursorArray[db_ins_id].execute("ROLLBACK")
            return False
            
            #測试读取
            """
            try:
                searchsql = 'select t_id from get_cont('+ str(t_id) + ')'
                self.cursorArray[db_ins_id].execute(searchsql)
                data = self.cursorArray[db_ins_id].fetchone()
                if (data == None  or int(data[0]) != t_id ):
                    print ' insert error for ' +str(db_ins_id) + searchsql
            except Exception,searchex:
                print str(searchex) + ' for  ' + searchsql
            """     
        return True
if __name__ == "__main__":
    pgTest = PGWriterTest('host=ip user=DBUser password=pwd dbname=DBNAme port=9999')
    start_time =  str(datetime.datetime.now())
    for i in range(1,10):
        t_id = 1000 + i;
        print str(datetime.datetime.now()) + " getting "
        rows = pgTest.read(t_id)
        
        print str(datetime.datetime.now()) + " gotten "
        #print rows[0]
        pgTest.save(t_id,rows[0])
    print start_time

    print  str(datetime.datetime.now())

----------------- 

转载请著明出处:

blog.csdn.net/beiigang
你可能感兴趣的文章
redis批量删除key
查看>>
MySQL学习笔记——安装及配置环境
查看>>
使用八种牛云存储解决方案ios7.1的app部署问题
查看>>
三种网络协议握手
查看>>
SCWS分词扩展在WINDOWS下的安装方法
查看>>
记录一下SparkStreaming中因为使用redis做数据验证而导致数据结果不对的问题
查看>>
poj 3080 Blue Jeans
查看>>
[leetcode]Subsets
查看>>
安卓游戏 收集钱袋(自制)
查看>>
响应式网站的开发
查看>>
Oracle窗口函数显示想要的行数
查看>>
匹配算法重载方法
查看>>
多级列表——ExpandableListView
查看>>
crm2011i创建nt类型字段
查看>>
nginx+php-fpm 报“File not found.”
查看>>
SWD 接口电路
查看>>
Charles proxy tools 移动开发调试
查看>>
sendmail服务器的安装
查看>>
Java 调用 Javascript 函数的范例
查看>>
拍照 和 选择相册 设置背景图片
查看>>