The Kafka Connector for Presto allows to access data from Apache Kafka using Presto.
Prerequisites
Download and install the latest version of the following Apache projects.
- Apache ZooKeeper
- Apache Kafka
Start ZooKeeper
Start ZooKeeper server using the following command.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Now, ZooKeeper starts port on 2181.
Start Kafka
Start Kafka in another terminal using the following command.
$ bin/kafka-server-start.sh config/server.properties
After kafka starts, it uses the port number 9092.
TPCH Data
Download tpch-kafka
$ curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_ 0811-1.0.sh
Now you have downloaded the loader from Maven central using the above command. You will get a similar response as the following.
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 0 0 0 0 0 0 0 0 --:--:-- 0:00:01 --:--:-- 0 5 21.6M 5 1279k 0 0 83898 0 0:04:30 0:00:15 0:04:15 129k 6 21.6M 6 1407k 0 0 86656 0 0:04:21 0:00:16 0:04:05 131k 24 21.6M 24 5439k 0 0 124k 0 0:02:57 0:00:43 0:02:14 175k 24 21.6M 24 5439k 0 0 124k 0 0:02:58 0:00:43 0:02:15 160k 25 21.6M 25 5736k 0 0 128k 0 0:02:52 0:00:44 0:02:08 181k ………………………..
Then, make it executable using the following command,
$ chmod 755 kafka-tpch
Run tpch-kafka
Run the kafka-tpch program to preload a number of topics with tpch data using the following command.
Query
$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny
Result
2016-07-13T16:15:52.083+0530 INFO main io.airlift.log.Logging Logging to stderr 2016-07-13T16:15:52.124+0530 INFO main de.softwareforge.kafka.LoadCommand Processing tables: [customer, orders, lineitem, part, partsupp, supplier, nation, region] 2016-07-13T16:15:52.834+0530 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'... 2016-07-13T16:15:52.834+0530 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'... 2016-07-13T16:15:52.834+0530 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'... 2016-07-13T16:15:52.834+0530 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'... ……………………… ……………………….
Now, Kafka tables customers,orders,supplier, etc., are loaded using tpch.
Add Config Settings
Let’s add the following Kafka connector configuration settings on Presto server.
connector.name = kafka kafka.nodes = localhost:9092 kafka.table-names = tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp, tpch.supplier,tpch.nation,tpch.region kafka.hide-internal-columns = false
In the above configuration, Kafka tables are loaded using Kafka-tpch program.
Start Presto CLI
Start Presto CLI using the following command,
$ ./presto --server localhost:8080 --catalog kafka —schema tpch;
Here “tpch” is a schema for Kafka connector and you will receive a response as the following.
presto:tpch>
List Tables
Following query lists out all the tables in “tpch” schema.
Query
presto:tpch> show tables;
Result
Table ---------- customer lineitem nation orders part partsupp region supplier
Describe Customer Table
Following query describes “customer” table.
Query
presto:tpch> describe customer;
Result
Column | Type | Comment -------------------+---------+--------------------------------------------- _partition_id | bigint | Partition Id _partition_offset | bigint | Offset for the message within the partition _segment_start | bigint | Segment start offset _segment_end | bigint | Segment end offset _segment_count | bigint | Running message count per segment _key | varchar | Key text _key_corrupt | boolean | Key data is corrupt _key_length | bigint | Total number of key bytes _message | varchar | Message text _message_corrupt | boolean | Message data is corrupt _message_length | bigint | Total number of message bytes