Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 105 additions & 28 deletions lightning/src/routing/utxo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,42 +293,69 @@ impl PendingChecks {
Ok(())
}

fn pending_channel_announcement_matches(
msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>,
pending_state: &UtxoMessages,
) -> bool {
match &pending_state.channel_announce {
Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
None => {
// This can be reached if `resolve_single_future` has already consumed
// `channel_announce` via `.take()` while the `Arc<Mutex<UtxoMessages>>` is still
// alive (e.g. held on the stack of `check_resolved_futures`). In that case,
// `complete` should also have been taken. Treat it as non-matching and let the
// new request fly.
debug_assert!(
pending_state.complete.is_none(),
"channel_announce is None but complete is still pending"
);
false
},
}
}

fn check_replace_previous_entry(
msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>,
replacement: Option<Weak<Mutex<UtxoMessages>>>,
replacement: Option<(&Arc<Mutex<UtxoMessages>>, &UtxoMessages)>,
pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>,
) -> Result<(), msgs::LightningError> {
let replacement_state = replacement.map(|(state, _)| state);
match pending_channels.entry(msg.short_channel_id) {
hash_map::Entry::Occupied(mut e) => {
// There's already a pending lookup for the given SCID. Check if the messages
// are the same and, if so, return immediately (don't bother spawning another
// lookup if we haven't gotten that far yet).
match Weak::upgrade(&e.get()) {
Some(pending_msgs) => {
// This may be called with the mutex held on a different UtxoMessages
// struct, however in that case we have a global lockorder of new messages
// -> old messages, which makes this safe.
let pending_state = pending_msgs.unsafe_well_ordered_double_lock_self();
let pending_matches = match &pending_state.channel_announce {
Some(ChannelAnnouncement::Full(pending_msg)) => {
Some(pending_msg) == full_msg
let pending_matches = match replacement {
Some((replacement, replacement_messages))
if Arc::ptr_eq(&pending_msgs, replacement) =>
{
// The pending entry points to the state whose mutex the caller
// already holds. Compare through the held guard instead of locking
// it again.
Self::pending_channel_announcement_matches(
msg,
full_msg,
replacement_messages,
)
},
Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
None => {
// This can be reached if `resolve_single_future` has already
// consumed `channel_announce` via `.take()` while the
// `Arc<Mutex<UtxoMessages>>` is still alive (e.g. held on
// the stack of `check_resolved_futures`). In that case,
// `complete` should also have been taken. Treat it as
// non-matching and let the new request fly.
debug_assert!(
pending_state.complete.is_none(),
"channel_announce is None but complete is still pending"
_ => {
// This may be called with the mutex held on a different
// UtxoMessages struct, however in that case we have a global
// lockorder of new messages -> old messages, which makes this safe.
let pending_state =
pending_msgs.unsafe_well_ordered_double_lock_self();
let matches = Self::pending_channel_announcement_matches(
msg,
full_msg,
&pending_state,
);
false
drop(pending_state);
matches
},
};
drop(pending_state);
if pending_matches {
return Err(LightningError {
err: "Channel announcement is already being checked".to_owned(),
Expand All @@ -340,25 +367,25 @@ impl PendingChecks {
// Note that in the replace case whether to replace is somewhat
// arbitrary - both results will be handled, we're just updating the
// value that will be compared to future lookups with the same SCID.
if let Some(item) = replacement {
*e.get_mut() = item;
if let Some(item) = replacement_state {
*e.get_mut() = Arc::downgrade(item);
}
}
},
None => {
// The earlier lookup already resolved. We can't be sure its the same
// so just remove/replace it and move on.
if let Some(item) = replacement {
*e.get_mut() = item;
if let Some(item) = replacement_state {
*e.get_mut() = Arc::downgrade(item);
} else {
e.remove();
}
},
}
},
hash_map::Entry::Vacant(v) => {
if let Some(item) = replacement {
v.insert(item);
if let Some(item) = replacement_state {
v.insert(Arc::downgrade(item));
}
},
}
Expand Down Expand Up @@ -442,7 +469,7 @@ impl PendingChecks {
Self::check_replace_previous_entry(
msg,
full_msg,
Some(Arc::downgrade(&future.state)),
Some((&future.state, &async_messages)),
&mut pending_checks.channels,
)?;
async_messages.channel_announce = Some(if let Some(msg) = full_msg {
Expand Down Expand Up @@ -1028,6 +1055,56 @@ mod tests {
assert!(!is_test_feature_set);
}

#[test]
fn test_no_deadlock_same_future_different_announcement() {
// A user's UtxoLookup may return the same UtxoFuture for repeated lookups for a
// given SCID. A different channel_announcement with that SCID should replace the
// pending message without re-locking the already-held future state.
let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
let scid = valid_announcement.contents.short_channel_id;

let notifier = Arc::new(Notifier::new());
let future = UtxoFuture::new(Arc::clone(&notifier));
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());

assert_eq!(
network_graph
.update_channel_from_announcement(&valid_announcement, &Some(&chain_source))
.unwrap_err()
.err,
"Channel being checked async"
);
assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);

let secp_ctx = Secp256k1::new();
let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
let replacement_announcement = get_signed_channel_announcement(
|msg| msg.features.set_unknown_feature_optional(),
replacement_pk_1,
replacement_pk_2,
&secp_ctx,
);
assert_eq!(
network_graph
.update_channel_from_announcement(&replacement_announcement, &Some(&chain_source))
.unwrap_err()
.err,
"Channel being checked async"
);
assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);

future
.resolve(Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
assert!(notifier.notify_pending());
network_graph.pending_checks.check_resolved_futures(&network_graph);
#[rustfmt::skip]
let is_replacement_feature_set =
network_graph.read_only().channels().get(&scid).unwrap().announcement_message
.as_ref().unwrap().contents.features.supports_unknown_test_feature();
assert!(is_replacement_feature_set);
}

#[test]
fn test_checks_backpressure() {
// Test that too_many_checks_pending returns true when there are many checks pending, and
Expand Down