python连接kafka-2.0

import sys
import time
import os
import json
import vertica_python
import logging
import pykafka
from pykafka import KafkaClient

#显示时间和编码方式
print('start time', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
print(sys.getdefaultencoding())

#kafka的zookepper地址和broker地址,注意要在/etc/hosts内加上解析,否则会有一大堆有规律的报错
client = KafkaClient(hosts="地址:9092",zookeeper_hosts="地址:2181")
topic = client.topics[b'订阅名']
consumer = topic.get_simple_consumer(consumer_timeout_ms=2000,consumer_group=b'自定义消费者名称')


#输出文件位置/计数初始化
file_output = open('输出文件位置', "w+", encoding='utf8')
file_output.truncate()
a_error_count = 0
a_line_count = 0
print('数据抽取准备完成')

#准备抽取
for message in consumer:
    #用于停止抽取来生成文件
    if message is not None and a_line_count<=20000:
        try:
            str_offset_join = message.value.decode()
            #json头部加上offset,用于唯一标识
            a = '{"offsets":"' + str(message.offset) + '",' + str_offset_join.lstrip('{')
            #使用b来验证数据是json能解析的
            b = json.loads(a)
            file_output.write(a)
            file_output.write('\n')
            a_line_count += 1
        except:
            print('error_message')
            a_error_count += 1
            continue
    else:
        break

#本次消费完成,提交消费进度
consumer.consume()
consumer.commit_offsets()

#导入数据库
file_output.close()
vsql_copy1="copy 表名 from local 数据位置"
vsql_copy1+=" parser fjsonparser() exceptions 数据位置 direct;"
vsql_line1="/opt/vertica/bin/vsql -h 地址 -U 用户名 -w 密码 -At -c \""+vsql_copy1+"\""
print(vsql_line1)
os.popen(vsql_line1)

print('===数据抽取完成==')
print('fetch comlete')
print('a_error_count=', str(a_error_count))
print('a_line_count=', str(a_line_count))
print("load complete")
print('end_time', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

本文转载于网络 如有侵权请联系删除

相关文章

  • 无需代码功能齐全!全新免费交互工具Framer Web到达战场,超好用!

    静电说:说到Framer详细接触UI比较早的同学都有了解,但是之前的Framer上手并不是太容易,需要学习代码且上手难度高,因此不少设计师被劝退。现在FramerWeb来了,这个全新的产品与之前需要客户端的Frame不一样,完全基于Web,且无需任何代码,即可实现强大的交互和高保真演示功能。静电试用了一下,真的还挺强大的。Principle和Flinto等工具瑟瑟发抖中。首先告知大家网址https://www.framer.com/这是一款Webbase工具,不需要客户端。↓↓一起来看看吧!首先是两个介绍视频↓↓从下图的演示可以看出,FramerWeb作出的交互效果还是非常平滑的,与其他交互工具不相上下,甚至更优秀。 甚至可以实现变量效果通过FramerWeb,我们可以完成非常多的交互效果,产品内置了多种预置的原型模板,切换效果非常平滑流畅。打开FramerWeb后,我们发现他的界面非常简约,相信用过Figma的小伙伴都可以轻松上手。通过import选项,我们可以导入来自Sketch,甚至Figma的设计文档。在Insert(插入)选项中,FramerWeb内置了多种效果。Frame中

  • 重庆火锅哪家强,Python帮你探探店

    前言大家好,随着国内疫情逐步好转,一些美食店铺又渐渐开张了,疫情结束你最想吃什么?来一顿痛快的火锅肯定是少不了的活动吧。可是对于选择困难症的我来说,吃饭一小时,挑店一下午,所以今天本文将再次爬取美团网相关数据,以一个消费者的角度去分析如何选择店铺。 数据获取与说明本文将基于重庆市火锅数据进行分析,为啥是重庆的火锅,除了辣之外还因为其他城市的火锅店数量相比重庆的火锅店数量就是小巫见大巫除此之外,相信重庆人民对于火锅的评价会更加地道,所以我们打开重庆市美团网选择火锅进入店铺列表。接下来的操作就和之前类似了:F12—>找到数据包—>requests请求数据—>pandas清洗但是本文需要爬取两层数据,首先需要在店铺列表取得店铺id、店铺名和销量等相关数据,第二步还需要根据刚刚的店铺id进入每一个店铺请求评论数据,相关代码如下forkinurl_list: url=k r=requests.get(url,headers=headers,proxies=proxies) data=r.json()['data']['poiInfos']

  • JfreeChart 乱码问题处理

    在前面之间加上下面这段代码即可。//创建主题样式 StandardChartThemestandardChartTheme=newStandardChartTheme("CN"); //设置标题字体 standardChartTheme.setExtraLargeFont(newFont("隶书",Font.BOLD,20)); //设置图例的字体 standardChartTheme.setRegularFont(newFont("宋书",Font.PLAIN,15)); //设置轴向的字体 standardChartTheme.setLargeFont(newFont("宋书",Font.PLAIN,15)); //应用主题样式 ChartFactory.setChartTheme(standardChartTheme);复制为了验证,先给出没有上面代码的一串代码:importjava.awt.Font; importjavax.swing.JPanel; importorg.jfree.chart

  • Python 学习之 def 函数

    def函数基础def函数的定义 在python中,def函数是用来定义一个function()的。基本使用deffunction_name(): expressions复制示例下面我们定义了一个function()实现打印a的值。deffunction(): a=1+2 print(a)复制当我们运行的时候,输出框是不会输出任何信息的,为什么?当我们定义了一个function(),并不意味着我们要调用它,所以我们需要运行这个function(),一个简单的办法就是在input框中输入function()。自调用如果想要在执行脚本的时候执行一些代码,如单元测试,可以在脚本最后加上单元测试代码。但是该脚本做为一个模块对外提供功能的时候单元测试代码也会执行,这些往往都不是我们想要的,我们可以把这些代码加入到脚本最后。if__name__=='__main__': #code_here复制如果执行该脚本的时候,该if判断语句将会是True,那么内部的代码将会执行。如果外部调用该脚本,if判断语句则为False,内部代码将不会执行。利用自调用函数运行如上的function()

  • 纽约客特稿 | 把癌症诊断交给机器,医疗服务会更好吗?

    选自Newyorker作者:SiddharthaMukherjee机器之心编译参与:侯韵楚、RickR、微胖、吴攀、蒋思源深度学习系统变得越强大,它就越含糊。由于更多的特征被提取了出来,诊断本身变得越来越准确。然而为什么这些特征会从数以百万计的其它特征中被提取出来,这仍然是一个无法回答的问题。 棒球运动员一次又一次地抛了一百万次球,他可能不了解任何方程式,但是他知道球到底会有多高、能达到多大的速度以及它会降落到地面的位置。物理学家可以写方程式来判断同一件事物。但是二者终将殊途同归。——GeoffreyHinton去年11月深夜,一名来自布朗克斯的54岁妇女由于剧烈的头痛而来到哥伦比亚大学医疗中心挂急诊。她对急诊室的医生说,她的视力已经变得模糊,左手无力且伴有麻痹症状。医生检查后,对她的头部进行了CT扫描。几个月后,今年1月份的一个清晨,4名培训中的放射科医师挤在医院三楼的一台电脑前。这个房间昏暗无窗,仅有几丝光线从屏幕映出,就像被海水层层滤过。隔间里挤满了人,哥伦比亚大学的神经放射科主任AngelaLignelli-Dipple正拿着铅笔和pad站在他们身后;她正在培训他们如何看懂CT

  • 远程服务调用PRC发展史

    本文是《凤凰架构》一书的读书笔记。 RPC调用简介 RPC(远程服务调用)是指位于互不重合的内存地址空间中(可以是一台机器也可以是不同网络分区的不同机器)的两个程序,在语言层面上,以同步的方式使用带宽有限的信道来传输程序控制信息。 通过网络进行分布式运算的八宗罪(8FallaciesofDistributedComputing) RPC协议要解决的三个问题 1.如何表示数据(如何进行序列化) 每种RPC协议都应该要有对应的序列化协议。 CORBA的通用数据表示(CommonDataRepresentation,CDR) JavaRMI的Java对象序列化流协议(JavaObjectSerializationStreamProtocol) gRPC的ProtocolBuffers WebService的XML序列化 众多轻量级RPC支持的JSON序列化 2.如何传递数据(使用什么协议来传递数据) 如何传递数据,准确地说,是指如何通过网络,在两个服务的Endpoint之间相互操作、交换数据。 两个服务交互数据不是只扔个序列化数据流来表示参数和结果就行,许多在此之外的信息,譬如异常、

  • elementUI table 表格序号index 翻页 增加

    <el-table-columntype="index"label="序号":index="indexMethod"width="90"align="center"></el-table-column> ...... //序号Index翻页递增currentPage当前页,fetchNum当前页显示条数 indexMethod(index){ return(this.currentPage-1)*this.fetchNum+index+1; }复制  

  • common-dbcp2数据库连接池参数说明

    参数 默认值 描述 建议值 DefaultAutoCommit复制  null   通过这个池创建连接的默认自动提交状态。如果不设置,则setAutoCommit方法将不被调用。复制  true   DefaultReadOnly复制  null 通过这个池创建连接的默认只读状态。如果不设置,则setReadOnly方法将不被调用。(部分驱动不支持只读模式,如:Informix)复制    false   DefaultTransactionIsolation复制 -1    通过这个池创建连接的默认事务策略,设置值为下列中的某一个:Connection.TRANSACTION_READ_COMMITTEDNONE、READ_COMMITTED、READ_UNCOMMITTED、REPEATABLE_READ、SERIALIZABLE、复制 Connection.TRANSACTION_READ_COMMITTED复制     D

  • ERD Crow&#39;s foot ER图

    本科期间学的ER图都没搞这么复杂,master这门课一下搞了一个crow’sfoot有点小麻烦;   crow'sfoot运用:      https://blog.csdn.net/aopstudio/article/details/122772545 实例参考:https://blog.csdn.net/huangbx_tx/article/details/108882613

  • 聊下并发和Tomcat线程数(Updated)

    最近一直在解决线上一个问题,表现是: Tomcat每到凌晨会有一个高峰,峰值的并发达到了3000以上,最后的结果是Tomcat线程池满了,日志看很多请求超过了1s。 服务器性能很好,Tomcat版本是7.0.54,配置如下: <Executorname="tomcatThreadPool"namePrefix="catalina-exec-" maxThreads="3000"minSpareThreads="800"/> <Connectorexecutor="tomcatThreadPool"port="8084"protocol="org.apache.coyote.http11.Http11AprProtocol" connectionTimeout="60000" keepAliveTimeout="30000" maxKeepAliveRequests="8000" maxHttpHeaderSize="8192" URIEncoding="UTF-8" enableLookups="false" acceptCount="1000" disabl

  • 使用systemd让程序自启保活

    进入到“/lib/systemd/system/”目录下,并新建一个STS.service文件,命令如下:   [Unit] Description="STS" After=network.targetntp.target [Service] Type=simple User=root ExecStart=/usr/bin/python/root/pythonProject/sts_server/httpserver.py Restart=always ExecStop= TimeoutStopSec=5 [Install] WantedBy=multi-user.target复制   2.将新添加的自启动服务生效 root@forlinx:~#systemctl-fenable/lib/systemd/system/MAC.service复制   3.启动服务 systemctlstartSTS复制  

  • 【Containerd版】Kubeadm高可用安装K8s集群1.23+

    目录基本环境配置节点规划网段规划及软件版本基本配置内核升级配置K8s组件及Runtime安装Containerd安装K8s组件安装高可用实现集群初始化Master01初始化添加Master节点添加Worker节点CNI插件Calico安装MetricsServer部署Dashboard部署安装登录Dashboard @目录 基本环境配置 成为K8s架构师只需一步,点我了解 节点规划 主机名 IP地址 说明 k8s-master01~03 10.0.0.201~203 master节点*3 k8s-master-lb 10.0.0.236 keepalived虚拟IP k8s-node01~02 10.0.0.204~205 worker节点*2 网段规划及软件版本 配置信息 备注 系统版本 CentOS7.9 Docker版本 20.10.x Pod网段 172.16.0.0/12 Service网段 192.168.0.0/16 基本配置 所有节点配置hosts,修改/etc/hosts如下: 10.0.0.201k8s-m

  • [ AGC001 D] Arrays and Palindrome

    题目 Atcoder 思路 代码 #include<iostream> #include<cstring> #include<algorithm> usingnamespacestd; constintN=110; intn,m,A[N],B[N],t[N]; intmain(){ cin>>n>>m; intcnt=0; t[1]=1,t[2]=m; for(inti=1;i<=m&&cin>>A[i];i++) if(A[i]&1)t[++cnt]=i;//找奇数 if(cnt>2)returncout<<"Impossible"<<endl,0;//判无解 if(m==1&&A[1]==1)returncout<<"1\n1\n1\n"<<endl,0; if(m==1)returncout<<A[1]<<"\n2\n"<<A[1]-1<<

  • Socket(套接字) 理解

    Socket(套接字)理解 这里对socket相关的知识点做下初步总结。 参考资料:https://blog.csdn.net/pashanhu6402/article/details/96428887 1、什么是TCP/IP、UDP、socket? TCP/IP(TransmissionControlProtocol/InternetProtocol)即传输控制协议/网间协议,是一个工业标准的协议集,它是为广域网(WANs)设计的。UDP(UserDataProtocol,用户数据报协议)是与TCP相对应的协议。它是属于TCP/IP协议族中的一种。 这里有一张图,表明了这些协议的关系: TCP/IP协议族包括运输层、网络层、链路层。现在你知道TCP/IP与UDP的关系了吧。 Socket在哪里呢? 在上面图中,我们没有看到Socket的影子,那么它到底在哪里呢?还是用图来说话,一目了然。 Socket是什么呢? Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket

  • django的多表操作

      多表操作:     基于对象的跨表查询(多次查询)       一对一:         正向查询按字段         反向查询按表名小写       一对多:         正向查询按字段(正向查询一定会查出一个来)         反向查询按表名小写_set.all()(返回结果是queryset对象)       多对多:         正向查询按字段.all()(正向查询一定会查出多个来)         反向查询按表名小写_set.all()(返回结果是queryset对象)     基于双下划线的跨表查询       在filter和values中都可以做连表操作(也就是都可以写__)       正向查询按字段       反向查询按表名小写       无论以谁做基表,没有效率之分  

  • 第二阶段团队绩效评分

    评分规则: 1、 工作量(25%):团队中每一个人所完成的工作量。 2、 任务难度或复杂性(20%):每个人认领个各个工作的任务难度和复杂度,包括代码量和所用知识点等。 3、 任务完成的及时(15%):根据第二条推测任务的大致完成时限,团队成员是否在时限内完成了任务。 4、 积极性(15%):是否积极地认领任务,为团队的目标而努力。 5、 团队意识(10%):是否始终以团队的利益为中心,和团队成员的配合是否默契。 6、 站立会议(10%):在站立会议中是否积极发言,提出自己的建议。 7、 学习意识(5%):遇到问题时是否自己思考,自己解决,而不是生搬硬套,直接拿网上的来用。 评估说明:每一项实行十分制,然后进行百分比计算,最后进行求和得出总的分数。 项 姓名 工作量 (25%) 任务难度复杂性 (20%) 任务完成的及时 (15%) 积极性 (15%) 团队意识 (10%)   站立会议(10%) 学习意识(5%) 总成绩(满分10分) 黄珺瑜 9 8

  • java开发环境搭建

    一、java安装 1、安装jdk    下载jdk安装包,一路next 2、设置java环境变量    新建环境变量 JAVA_HOME,设置值为jdk安装目录(D:\ProgramFiles\Java\jdk1.8.0_40)    path环境变量里面增加   %JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;   3、验证java安装         二、tomcat安装 1、下载tomcat,解压 2、设置环境变量    新增 CATALINA_HOME,设置为tomcat解压目录(D:\tool\apache-tomcat-8.0.20) 3、验证是否安装成功 打开http://127.0.0.1:8080/ ,看是否能够看到tomcat页面   三、eclipse安装 1、下载eclipse对应

  • 【python】python文件和目录操作方法大全(含实例)

    转自:http://www.jb51.net/article/48001.htm 一、python中对文件、文件夹操作时经常用到的os模块和shutil模块常用方法。1.得到当前工作目录,即当前Python脚本工作的目录路径:os.getcwd()2.返回指定目录下的所有文件和目录名:os.listdir()3.函数用来删除一个文件:os.remove()4.删除多个目录:os.removedirs(r“c:\python”)5.检验给出的路径是否是一个文件:os.path.isfile()6.检验给出的路径是否是一个目录:os.path.isdir()7.判断是否是绝对路径:os.path.isabs()8.检验给出的路径是否真地存:os.path.exists()9.返回一个路径的目录名和文件名:os.path.split()    egos.path.split('/home/swaroop/byte/code/poem.txt')结果:('/home/swaroop/byte/code','poem.txt') 10.分离扩展名

  • 深入解析SQL Server并行执行原理及实践(上)

    在成熟领先的企业级数据库系统中,并行查询可以说是一大利器,在某些场景下他可以显著的提升查询的相应时间,提升用户体验.如SQLServer,Oracle等,Mysql目前还未实现,而PostgreSQL在2015实现了并行扫描,相信他们也在朝着更健壮的企业级数据库迈进.RDBMS中并行执行的实现方式大抵相同,本文将通过SQLServer为大家详细解析SQLServer并行执行的原理及一些实践. 准备知识 硬件环境-在深入并行原理前,我们需要一些准备知识,用以后面理解并行.首先是当下的硬件环境,社会信息化建设,互联网的普及,硬件工艺的大发展…我们现在的硬件设备的性能虽然已经不复摩尔定律的神奇,但已经相当丰富了,存储上15KRPM的硬盘,SSD,PCI-ESSD使得磁盘作为数据库系统的”终极”瓶颈得到了一定的缓解,而内存上百G内存也早已不是小机的”特性”了,PCServer上已经很普遍.在处理能力上,由于现今物理工艺上的极限,单颗CPU处理能力提升困难,系统架构已经朝着多颗多核的架构上迅猛发展,如8路CPU的服务器也已经有一定的使用案例了.总之我们的硬件资源越来越丰富,而本文所讲的并行查询查

  • java如何拿到EasyExcel生成文件,上传到七牛云

    最近笔者遇到一个新需求,就是将一批数据进行导入数据库,但是要将校验不通过的数据生成一个Excel文件,放到七牛云上面,将地址返回前端,让前端自行下载. 我相信一开始很多人都是直接生成文件,然后使用response相应回去给前端的,因为我的做法是一个借口实现导入与将失败的数据导出,所以使用response,前端就无法接收到Message进行提示导入成功多少条数据,失败多少条数据了,废话少说,先看看response响应回去是怎么操作的,上代码 /** *默认的样式设置 * *@return样式信息 */ publicstaticHorizontalCellStyleStrategyexcelStyleSet(){ //头的策略 WriteCellStyleheadWriteCellStyle=newWriteCellStyle(); WriteFontheadWriteFont=newWriteFont(); headWriteFont.setFontHeightInPoints((short)12); headWriteCellStyle.setWriteFont(headWriteF

  • OO第三单元总结

    一、JML知识梳理  JML是用于对java程序进行规格化设计的一种表示语言,通过JML及其支持工具,不仅可以基于规格自动构造测试用例,并整合了SMTSolver等工具以静态方式来检查代码实现对规格的满足情况。 JML理论基础 1.注释结构 JML以注释的方式表示规格,每行都以@起头,有两种注释方式:行注释和块注释,如下所示: 1//@行注释 2 3/*@块注释 4@块注释 5@*/ 6复制 举例代码如下: 1//@publicinstancemodelnon_nullPath[]pList; 2//@publicinstancemodelnon_nullint[]pidList; 3//@publicinvariantpList.length==pidList.length; 4//@publicconstraintMath.abs(pList.length-\old(pList.length))<=1; 5 6/*@publicnormal_behavior 7@requirescontainsPathId(pa

相关推荐

推荐阅读