answerstu

apache kafka - Fetching avro schema for producer from schema registry in code

Confluent provides following example:String key = "key1";String userSchema = "{\"type\":\"record\"," + "\"name\":\"myrecord\"," + "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";Schema.Parser parser = new Schema.Parser();Schema schema = parser.parse(userSchema);GenericRecord avroRecord = new GenericData.Record(schema);avroRecord.put("f1", "value1");ProducerRecord<Object, Object> record = new ProducerRecord<>("topic1", key, avroRecord);try { producer.send(record);} catch(SerializationException e) ...Read more

apache kafka - Connect consumers jobs are getting deleted when restarting the cluster

I am facing the below issue on changing some properties related to kafka and re-starting the cluster.In kafka Consumer, there were 5 consumer jobs are running . If we make some important property change , and on restarting cluster some/all the existing consumer jobs are not able to start.Ideally all the consumer jobs should start , since it will take the meta-data info from the below System-topics .config.storage.topicoffset.storage.topicstatus.storage.topic...Read more

Kafka connect JDBC connector

I am trying to use io.confluent.connect.jdbc.JdbcSourceConnector in bulk mode using the query .query = select name, cast(ID as NUMBER(20,2)),status from table_nameIs this possible?If possible am I missing something?I am getting exception (org.apache.kafka.connect.runtime.WorkerTask:148)org.apache.avro.SchemaParseException: Illegal character in: CAST(IDASNUMBER(20,2))...Read more

Kafka jdbc sink connector with json schema not working

Using the latest kafka and confluent jdbc sink connectors. Sending a really simple Json message:{ "schema": { "type": "struct", "fields": [ { "type": "int", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "msg" } ], "optional": false, "name": "msgschema" }, "payload": { "id": 222, "msg": "hi" }}But getting error:org.ap...Read more

Kafka Connect - How to delete a connector

I created a cassandra-sink connector after that I made some changes in connector.properties file. After stopping the worker and starting it again, now when I add the connector using:java -jar kafka-connect-cli-1.0.6-all.jar create cassandra-sink-orders < cassandra-sink-distributed-orders.propertiesI get the following error:Error: the Kafka Connect API returned: Connector cassandra-sink-orders already exists (409) How can I remove the existing connector?...Read more

kafka connect multiple topics in sink connector properties

I am trying to read 2 kafka topics using Cassandra sink connector and insert into 2 Cassandra tables. How can I go about doing this?This is my connector.properties file:name=cassandra-sink-ordersconnector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnectortasks.max=1topics=topic1,topic2connect.cassandra.kcql=INSERT INTO ks.table1 SELECT * FROM topic1;INSERT INTO ks.table2 SELECT * FROM topic2 connect.cassandra.contact.points=localhostconnect.cassandra.port=9042connect.cassandra.key.space=ksconnect.cassandra.con...Read more

Loading data into oracle table using Kafka jdbc sink

I am trying to load the data from Kafka to Oracle using JDBC sink connector to replicate the example mentioned in the confluent website:https://docs.confluent.io/current/connect/connect-jdbc/docs/sink_connector.html name=jdbc-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 # The topics to consume from - required for sink connectors like this one topics=orders # Configuration specific to the JDBC sink connector. # We want to connect to a SQLite database stored in the ...Read more

Kafka Connect java.lang.NoSuchMethodError: com.google.common.collect.Sets$SetView.iterator()Lcom/google/common/collect/UnmodifiableIterator;

I am trying to setup kafka-connect-cassandra on an AWS instance.I have setup plugin.path in connect-avro-distributed.properties file:plugin.path=/home/ubuntu/kafka_2.11-1.0.0/pluginsAnd I have kafka-connect-cassandra-1.0.0-1.0.0-all.jar in:/home/ubuntu/kafka_2.11-1.0.0/plugins/libThis is the traceback:[2018-02-18 10:28:33,268] INFO Kafka Connect distributed worker initializing ... (org.apache.kafka.connect.cli.ConnectDistributed:60)[2018-02-18 10:28:33,278] INFO WorkerInfo values: jvm.args = -Xmx256M, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, ...Read more

upstart script for kafka

I am using upstart script to do start kafka and zookeeper. My steps are:(1)The following is kafka-zk.conf (Put it in /etc/init/):kill timeout 300start on runlevel [2345]stop on runlevel [06]pre-start scriptend scriptscript KAFKA_HOME=/home/My/kafka/kafka_<version> exec $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.propertiesend script(2) Create symbol link: ln -s /lib/init/upstart-job /etc/init.d/kafka-zk(3) Run "sudo service kafka-zk start|stop" to start or stop kafka and zookeeperMy question is: when runni...Read more

apache kafka - How to expose a headless service for a StatefulSet externally in Kubernetes

Using kubernetes-kafka as a starting point with minikube.This uses a StatefulSet and a headless service for service discovery within the cluster.The goal is to expose the individual Kafka Brokers externally which are internally addressed as:kafka-0.broker.kafka.svc.cluster.local:9092kafka-1.broker.kafka.svc.cluster.local:9092 kafka-2.broker.kafka.svc.cluster.local:9092The constraint is that this external service be able to address the brokers specifically.Whats the right (or one possible) way of going about this? Is it possible to expose a exte...Read more

Using flume to import data from kafka topic to hdfs folder

I am using flume to load messages from kafka topic HDFS folder. So,I created a topic TT I sent messages to TT with a kafka console producer I configured the flume agent FFRun the flume agent flume-ng agent -n FF -c conf -f flume.conf - Dflume.root.logger=INFO,consoleThe Code Execution Stops, without Error and it doesnot write anything to HDFS. The log file contains this warning No broker partitions consumed by consumer thread flume_-0 for topic.Any help is greatly appreciated. Thanks in advance.:)...Read more

How to use Flume's Kafka Channel without specifying a source

I have an existing Kafka topic and a flume agent that reads from there and writes to HDFS. I want to reconfigure my flume agent so it will move away from the existing setup; a Kafka Source, file Channel to HDFS Sink, to use a Kafka Channel. I read in the cloudera documentation that it possible to achieve this by only using a Kafka Channel and HDFS sink (without a flume source).. (unless I have got the wrong end of the stick.) So I tried to create this configuration but it isn't working. It's not even starting the flume process on the box.# Test...Read more

apache kafka - Streamparse wordcount example

I have been wanting to use Apache Storm to stream from Kafka. I am more comfortable with Python, so I decided to use streamparse (https://github.com/Parsely/streamparse). The word count example is the introductory example. I have been trying to get it to work on my local machine. I have the following version of JDK, lein and storm installed:Leiningen 2.6.1 on Java 1.8.0_73 Java HotSpot(TM) 64-Bit Server VMI run the following steps after following streamparse:sparse quick start wordcountcd wordcountsparse runI get the following error:Retrieving ...Read more