$ cd ~
$ scala -version
$ cd ~
$ sudo apt install zip unzip
$ curl -s "https://get.sdkman.io" | bash
$ source "/home/hduser/.sdkman/bin/sdkman-init.sh"
$ sdk install scala 2.13.12
$ scala -version
$ cd ~
$ wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
$ tar -xvzf kafka_2.13-3.7.0.tgz
$ mv kafka_2.13-3.7.0 kafka
$ chown hduser:hadoop -R kafka
$ cd ~/kafka
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
(Then, press the <enter> key and Zookeeper will continue running in the background.)
$ bin/kafka-server-start.sh config/server.properties &
(Then, press the <enter> key and Kafka broker will continue running in the background.)
$ jps
Note that you should observe the QuorumPeerMain (for Zookeeper) and Kafka processes with the command jps. You should observe a total of eight services, i.e. QuorumPeerMain, Jps, SecondaryNameNode, ResourceManager, NodeManager, DataNode, Kafka, NameNode
$ bin/kafka-server-stop.sh
$ bin/zookeeper-server-stop.sh
Attention: When you have finished your exercises (before shutdown your Windows), you need stop the Kafka and Zookeeper services
$ ~/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
$ ~/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
$ ~/kafka/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
List all available topics, and carry out the following steps after your Consumer Terminal is ready to consume messages
$ ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
A prompt (>) should appear for you to input your messages. For example, type in the following messages
This is the first test message
This is the second test message
To quit, press Ctrl-C. Note that messages are stored by default in the /tmp/kafka-logs/ directory or set as the value of log.dirs in the config/server.properties file.
$ ~/kafka/bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test
$ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
At your Producer terminal, send more messages (refer to Step 3) and observe corresponding effects. Note that new messages arriving in the Consumer terminal
$ ~/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /tmp/kafka-logs/test-0/00000000000000000000.log
Note that follow the file name of the *.log file that appears in your directory
$ cd ~
$ pip install kafka-python
$ cd ~
$ python
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
>>>
>>> for msg in consumer:
... print("msg: ", msg)
$ cd ~
$ python
>>> from kafka import KafkaProducer
>>> from json import dumps
>>> producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
>>> producer.send('automobile', value='The new Ford Ranger')
>>> import time
>>> i = 1
>>> while True:
... time.sleep(10)
... print("i = ", i)
... producer.send('test', str(i*10) + " seconds have passed")
... i += 1
>>> producer=KafkaProducer(linger_ms=8000, bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).encode('utf-8'))
>>> producer.flush()
>>> future=producer.send('test','test message 1')
>>> future=producer.send('test','test message 2')
>>> future=producer.send('test','test message 3')
In the above example, the 8s delay will result in all three messages appearing at almost the same time to the consumer.
$ cd ~
$ pyspark --executor-cores 4
>>> from kafka import KafkaProducer
>>> from json import dumps
>>> import time
>>> producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
>>> i = 1
>>> while True:
... time.sleep(10)
... print("i = ", i)
... producer.send('test', str(i*10) + " seconds have passed", partition=0)
... i += 1
>>> from kafka import KafkaConsumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
>>> consumer.assign([TopicPartition('test', 2)])
>>> consumer.assign([TopicPartition('test', 0)])
>>> for msg in consumer:
... print("msg in consumer ", msg)
>>> i = 1
>>> while True:
... output = "Elapsed time: " + str(i * 10) + "s"
... producer.send('test', {'message':output})
... print('message ' + output + " sent")
... time.sleep(10)
... i += 1
$ cd ~
$ nano send_messages.py
from kafka import KafkaProducer
from json import dumps
import time
producer=KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
producer.send('automobile', value='The new Proton X70')
producer.send('automobile', value='Proton X50')
producer.flush()
$ python send_messages.py
$ cd ~
$ pip install python-twitter
$ pip install tweepy
Get your API access keys from your Twitter API account. If you do not have a Twitter Developer account yet, you may apply for one at URL https://developer.twitter.com/en/apply-for-access
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from kafka import KafkaProducer
import json
access_token = "__"
access_token_secret = "__"
consumer_key = "__"
consumer_secret = "__"
class StdOutListener(StreamListener):
def on_data(self, data):
json_ = json.loads(data)
producer.send("automobile", json_["text"].encode('utf-8'))
return True
def on_error(self, status):
print(status)
producer = KafkaProducer(bootstrap_servers='localhost:9092')
l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l)
stream.filter(track=["automobile", "car", "transport", "train", "LRT"])