Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 57 additions & 36 deletions src/consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,53 +9,61 @@ use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
use std::time::Duration;
use uuid::Uuid;

pub struct ConsumeConfig<'a> {
pub topic: &'a BrokerAndTopic,
pub partition: Option<i32>,
pub filter: &'a Option<String>,
pub num_messages: Option<i64>,
pub offset: Option<i64>,
pub last: Option<i64>,
pub timestamp: Option<i64>,
pub key: bool,
pub terse: bool,
pub kafka_config: Option<Vec<KafkaOption>>,
}

#[allow(clippy::too_many_arguments)]
Comment thread
Tom-Willemsen marked this conversation as resolved.
pub fn consume(
topic: &BrokerAndTopic,
partition: Option<i32>,
filter: &Option<String>,
num_messages: Option<i64>,
offset: Option<i64>,
last: Option<i64>,
timestamp: Option<i64>,
kafka_config: Option<Vec<KafkaOption>>,
) {
pub fn consume(config: &ConsumeConfig) {
debug!(
"Listening to topic: {} partition {:?} on broker {}:{}, filtering {}",
topic.topic,
partition,
topic.host,
topic.port,
filter.as_deref().unwrap_or("none")
config.topic.topic,
config.partition,
config.topic.host,
config.topic.port,
config.filter.as_deref().unwrap_or("none")
);
let mut config = ClientConfig::new();
config.set("group.id", Uuid::new_v4().to_string());
config.set("bootstrap.servers", topic.broker());
let mut client_config = ClientConfig::new();
client_config.set("group.id", Uuid::new_v4().to_string());
client_config.set("bootstrap.servers", config.topic.broker());

if let Some(kafka_options) = kafka_config {
if let Some(kafka_options) = &config.kafka_config {
for option in kafka_options {
println!(
"Setting Kafka config option {}={}",
option.key, option.value
);
config.set(&option.key, &option.value);
client_config.set(&option.key, &option.value);
}
}

let consumer: BaseConsumer = config.create().expect("Base creation failed");
let consumer: BaseConsumer = client_config.create().expect("Base creation failed");

let start: Option<Offset>;

let (low_watermark, high_watermark) = consumer
.fetch_watermarks(&topic.topic, partition.unwrap_or(0), Duration::from_secs(1))
.unwrap_or_else(|_| panic!("Failed to get watermarks for topic {}", topic.topic));
.fetch_watermarks(
&config.topic.topic,
config.partition.unwrap_or(0),
Duration::from_secs(1),
)
.unwrap_or_else(|_| panic!("Failed to get watermarks for topic {}", config.topic.topic));
let num_messages_on_topic = high_watermark - low_watermark;

if let Some(_timestamp) = timestamp {
if let Some(_timestamp) = config.timestamp {
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(
&topic.topic,
partition.unwrap_or(0),
&config.topic.topic,
config.partition.unwrap_or(0),
Offset::Offset(_timestamp),
)
.expect("Can't add partition to consumer with timestamp");
Expand All @@ -68,13 +76,13 @@ pub fn consume(
.expect("No topics found for timestamp")
.offset(),
);
} else if let Some(_offset) = offset {
} else if let Some(_offset) = config.offset {
assert!(
_offset >= low_watermark && _offset <= high_watermark,
"offset ({_offset:?}) must be between high ({high_watermark}) and low({low_watermark}) watermarks"
);
start = Some(Offset::Offset(_offset));
} else if let Some(last_num_messages) = last {
} else if let Some(last_num_messages) = config.last {
assert!(
last_num_messages <= num_messages_on_topic,
"Cannot consume {last_num_messages:?} messages from a topic which only has {num_messages_on_topic} messages"
Expand All @@ -87,36 +95,49 @@ pub fn consume(
if let Some(_start) = start {
info!("Starting at {_start:?}");
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(&topic.topic, partition.unwrap_or(0), _start)
tpl.add_partition_offset(&config.topic.topic, config.partition.unwrap_or(0), _start)
.unwrap_or_else(|_| panic!("Failed to set partition offset to {:?}", _start));
consumer.assign(&tpl).expect("Failed to assign to topic");
}

consumer
.subscribe(&[&topic.topic])
.unwrap_or_else(|_| panic!("Failed to subscribe to topic {}", topic.topic));
.subscribe(&[&config.topic.topic])
.unwrap_or_else(|_| panic!("Failed to subscribe to topic {}", config.topic.topic));

let mut counter = 0;
for message in consumer.iter() {
match message {
Ok(message) => {
if partition.is_some() && Some(message.partition()) != partition {
if config.partition.is_some() && Some(message.partition()) != config.partition {
continue;
}

match message.payload() {
Some(p) => {
if let Some(f) = filter {
if let Some(f) = config.filter {
if let Some(schema_id) = get_schema_id(p)
&& schema_id != f.to_bytes()
{
continue;
}
debug!("Message has no schema id, ignoring filter")
}

print!("[partition={}", message.partition());
if config.key {
print!(" key={:?}", message.key().unwrap_or(b""));
}
print!("] ");
match deserialize_message(p) {
Ok(d) => println!("{d:?}"),
Ok(d) => {
if config.terse {
let schema = get_schema_id(p)
.and_then(|s| str::from_utf8(s).ok())
.unwrap_or("invalid");
println!("{schema} ({} bytes)", p.len())
} else {
println!("{d:?}")
}
}
Err(e) => error!("Failed to deserialize message: {e:?}"),
}
}
Expand All @@ -132,7 +153,7 @@ pub fn consume(
}
}
counter += 1;
if Some(counter) == num_messages || Some(counter) == last {
if Some(counter) == config.num_messages || Some(counter) == config.last {
println!("Reached {} messages, exiting", counter);
break;
}
Expand Down
23 changes: 17 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod sniff;

use crate::cli_utils::BrokerAndOptionalTopic;
use crate::cli_utils::KafkaOption;
use crate::consume::ConsumeConfig;
use crate::count::count;
use crate::howl::{EventMessageConfig, HowlConfig, howl};
use crate::sniff::sniff;
Expand Down Expand Up @@ -45,7 +46,13 @@ enum Commands {
/// Print last x messages on topic
#[arg(short, long, conflicts_with_all = ["offset","timestamp","messages","filter"])]
last: Option<i64>,
// Additonal command line arguments
/// Show message key
#[arg(long, action=clap::ArgAction::SetTrue)]
key: bool,
/// Print using terse format (just schema ID and length)
#[arg(long, action=clap::ArgAction::SetTrue)]
terse: bool,
/// Additonal Kafka options
#[arg(short = 'X', long)]
kafka_config: Option<Vec<KafkaOption>>,
},
Expand Down Expand Up @@ -126,17 +133,21 @@ async fn main() {
offset,
last,
timestamp,
key,
terse,
kafka_config,
} => consume::consume(
&topic,
} => consume::consume(&ConsumeConfig {
topic: &topic,
partition,
&filter,
messages,
filter: &filter,
num_messages: messages,
offset,
last,
timestamp,
key,
terse,
kafka_config,
),
}),
Commands::Sniff {
broker,
kafka_config,
Expand Down
Loading