1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::{UNIX_EPOCH, SystemTime};
use datatypes::ImportantFunctions as _;
use fast_protocol::datatypes::packets::{KeepAlive, ConfigurationDisconnect, ClientboundPackets, ConfigurationClientboundPluginMessage, RegistryData, RemoveResoucePack, AddResourcePack, FeatureFlags, FinishConfiguration};
use openssl::sha::Sha1;
use rand::RngCore as _;
use tokio::net::tcp::{WriteHalf, ReadHalf};
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, oneshot};
use tokio::io::{AsyncWrite, BufReader};
use tokio::net::TcpStream;
use tokio::time::{Instant, Duration};

use fast_protocol::datatypes::{packets::{State, ServerboundPackets, LegacyPongPacket, PongPacket, StatusResponsePacket, LoginSuccess}, datatype_definition::important_enums::HandshakeNextState};
use binary_utils::DataWriter;

use crate::server::ConfigData;
use crate::{server::{ServerMessage, server_settings::ServerSettings}, player::{Player, PlayerMessages}};

const _UPDATE_RATE: f32 = 100.0;

/// Struct used to handle connections of the players
pub struct ConnectionHandler;
/// Error enum of the different errors that can occur
#[derive(Debug)]
pub enum ConnectionHandlerError {
    /// This error occurs, if an object could not be moved between two threads
    ThreadMovement(String),
    /// This error occurs, if the server was not able to send an packet to the player
    PacketSent(String),
    /// This error is and error used when the server closes the connection
    Shutdown(String),
    /// This error occurs, if the start sequence is not being used
    StartSequence(String),
    /// This error occurs, if an error occurs while reading a packet
    PacketReading(binary_utils::Error),
    /// This error occurs, if a response is not how it es expected
    ReponseError,
    /// This error occurs, if a channel produces an error
    ChannelError,
    /// This error is being used, if the server kicks a player
    KickingPlayer,
    /// This error is being used, if the thread wants to close the `TcpStream` beacause of
    /// inactivity
    CloseConnection,
}
impl From<binary_utils::Error> for ConnectionHandlerError {
    fn from(value: binary_utils::Error) -> Self {
        Self::PacketReading(value)
    }
}
/// An custom error type that keeps track of the error type and the player that the thread was
/// handling
#[derive(Debug)]
pub struct Error {
    /// Type of the error
    pub error_type: ConnectionHandlerError,
    /// instance of the player that the thread handled if one exists
    pub player: Option<Arc<Mutex<fast_protocol::datatypes::json_datastructures::Player>>>,
}
type ErrorResult = std::result::Result<(), ConnectionHandlerError>;
async fn write(packet: &impl DataWriter, writer: &mut (impl AsyncWrite + Unpin + Send)) -> ErrorResult {
    match packet.write(writer).await {
        Ok(_) => Ok(()),
        Err(_) => Err(ConnectionHandlerError::PacketSent(format!("{}:{}", file!(), line!()))),
    }
}
fn minecraft_sha1_hexdigest(input: Sha1) -> String {
    let result = input.finish();
    let mut hex_digest = std::string::String::new();
    for byte in &result {
        hex_digest += &format!("{:02x}", byte);
    }
    if result[0] & 0x80 != 0 {
        format!("-{}", hex_digest)
    } else {
        hex_digest
    }
}
const KICK_TIME: Duration = Duration::from_secs(15);
impl ConnectionHandler {
    async fn handle_incomming_messages<'a>(_writer: &mut WriteHalf<'a>, receiver: &mut mpsc::Receiver<PlayerMessages>, _packet_queue: &mut VecDeque<ClientboundPackets>) {
        while let Ok(message) = receiver.try_recv() {
            match message {
            }
        }
    }
    /// function to handle the keep alive
    ///
    /// # Note
    ///
    /// This function checks if it has to send/receive a packet. it can be run every iteration
    async fn send_keep_alive<'a>(
        state: &State,
        writer: &mut WriteHalf<'a>,
        last_keep_alive_send: &mut Instant,
        keep_alive_answered_at: &Instant,
        await_keep_alive_answer: &mut bool,
        player: &Option<Player>
    ) -> Result<(), (ConnectionHandlerError, Option<Player>)> {
        let now = Instant::now();
        if !now.duration_since(*last_keep_alive_send).lt(&KICK_TIME) {
            match state {
                State::Login | State::Status | State::Handshake => return Err((ConnectionHandlerError::CloseConnection, None)),
                State::Configuration => {
                    if *await_keep_alive_answer || (*last_keep_alive_send).saturating_duration_since(*keep_alive_answered_at).ge(&KICK_TIME) {
                        return Self::disconnect_player(player.clone());
                    } else {
                        let kap = KeepAlive::new(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() as i64);
                        if let Err(e) = kap.write(writer).await {
                            return Err((ConnectionHandlerError::PacketSent(format!("{:?}", e)), player.clone()));
                        } else {
                            *last_keep_alive_send = now;
                            *await_keep_alive_answer = true;
                        }
                    }
                },
                State::Playing => {
                    todo!()
                }
            }
        }
        Ok(())
    }
    /// function to disconnect the player
    ///
    /// # Arguments
    /// `player` - optional player instance
    fn disconnect_player(player: Option<Player>) -> Result<(), (ConnectionHandlerError, Option<Player>)> {
        return Err((ConnectionHandlerError::KickingPlayer, player));
    }
    /// function to handle/receive packets
    ///
    /// # Arguments
    /// `reader` - A `BufReader<ReadHalf>` to read data from the stream
    /// `writer` - A `WriteHalf` to write data into the stream
    /// `sender` - A `Sender<ServerMessage` to be able to communicate with the server
    /// `state` - The current state of the connection
    /// `player` - An optional instance of the player of the connection
    /// `settings` - A reference to the `ServerSettings`
    /// `keep_alive_answered_at` - A mutable reference to an `Instant` that records the last time,
    /// that an `KeepAlive` was answered at
    /// `await_keep_alive_answer` - A mutable reference to a bool whether an keep alive is awaited
    /// or not
    /// `player_sender` - A reference to the player sender instance to read from it
    /// `packet_queue` - A mutable reference to a `VecDeque<ClientboundPackets>` to send packets,
    /// that are queued to be send to the player
    async fn receive_packets<'a>(
        reader: &mut BufReader<ReadHalf<'a>>,
        writer: &mut WriteHalf<'a>,
        sender: &mpsc::Sender<ServerMessage>,
        state: &mut State,
        player: &mut Option<Player>,
        settings: &ServerSettings,
        keep_alive_answered_at: &mut Instant,
        await_keep_alive_answer: &mut bool,
        player_sender: &Sender<PlayerMessages>,
        packet_queue: &mut VecDeque<ClientboundPackets>,
    ) -> Result<(), (ConnectionHandlerError, Option<Player>)> {
        let packet = ServerboundPackets::read(reader, state).await;
        if let Err(err) = packet {
            return Err((ConnectionHandlerError::from(err), player.clone()));
        }
        let packet = packet.unwrap();
        match packet {
            ServerboundPackets::None => {
            },
            ServerboundPackets::LegacyPing => {
                println!("Legacy Ping");
                let (player_sender, player_rec) = oneshot::channel();
                match sender.send(ServerMessage::GetPlayerAmount(player_sender)).await {
                    Ok(()) => (),
                    Err(_) => return Err((ConnectionHandlerError::ReponseError, player.clone())),
                }
                let player_amount = match player_rec.await {
                    Ok(v) => v,
                    Err(_) => return Err((ConnectionHandlerError::ReponseError, player.clone())),
                };
                let packet = LegacyPongPacket::new(settings.version.to_string(), settings.motd.clone(), player_amount, settings.max_players);
                match packet.write(writer).await {
                    Ok(_) => (),
                    Err(_) => return Err((ConnectionHandlerError::CloseConnection, None)),
                }
                return Err((ConnectionHandlerError::CloseConnection, None));
            }
            ServerboundPackets::Handshake(handshake) => {
                println!("next_state: {:?}", handshake.next_state);
                match handshake.next_state.get_value() {
                    HandshakeNextState::Login => *state = State::Login,
                    HandshakeNextState::Status => *state = State::Status,
                }
            },
            ServerboundPackets::PingRequest(ping) => {
                let pong = PongPacket::new(ping.id.get_value());
                match write(&pong, writer).await {
                    Ok(_) => Ok(()),
                    Err(err) => Err((err, player.clone())),
                }?
            },
            ServerboundPackets::StatusRequest(_req) => {
                let (player_sender, player_rec) = oneshot::channel();
                if let Err(_) = sender.send(ServerMessage::GetPlayers(player_sender)).await {
                    return Err((ConnectionHandlerError::ChannelError, player.clone()));
                }
                let players = match player_rec.await {
                    Ok(v) => v,
                    Err(_) => return Err((ConnectionHandlerError::ReponseError, player.clone())),
                };
                let res = StatusResponsePacket::new(
                    settings.version.to_string(),
                    settings.protocol_version,
                    settings.max_players,
                    players.len() as u16,
                    players.iter().map(|p| fast_protocol::datatypes::json_datastructures::Player::new(p.username.clone(), p.uuid.to_string())).collect(),
                    settings.motd.clone(),
                );
                if let Err(err) = res.write(writer).await {
                    return Err((ConnectionHandlerError::from(err), None));
                }
            },
            ServerboundPackets::LoginStart(req) => {
                println!("Username: {}", req.name.get_value());
                *player = Some(Player { uuid: req.uuid.get_value(), username: req.name.get_value(), sender: player_sender.clone(), information: None });
                if settings.offline_mode {
                    let res = LoginSuccess::new(req.uuid, req.name);
                    match res.write(writer).await {
                        Ok(_) => (),
                        Err(err) => return Err((ConnectionHandlerError::from(err), player.clone())),
                    }
                } else {
                    // todo request rsa key from server via sender.send
                    // let res = LoginEncryptionRequest::new(server.rsa_key.clone(), verify_token);
                    // res.write(&mut writer).await?;
                }
            }
            ServerboundPackets::LoginEncryptionResponse(res) => {
                println!("Reading this:");
                let shared_secret = res.shared_secret.get_value();
                let mut _new_shared_secret = vec![0;shared_secret.len()+500];
                println!("{:?}", shared_secret);
                println!("Beeing here (1) {}", shared_secret.len());
                // TODO decript via requested provate key with sender.send
                /*
                server.rsa_key.clone().private_decrypt(
                &shared_secret,
                &mut new_shared_secret,
                Padding::PKCS1);
                */
                println!("Beeing here (2)");
                let mut sha1 = openssl::sha::Sha1::new();
                sha1.update(&shared_secret);
                // TODO use requested rsa key
                // sha1.update(&server.rsa_key.clone().public_key_to_der().unwrap());
                let hash = minecraft_sha1_hexdigest(sha1);
                let username = player.clone().unwrap().username.clone();
                let url = format!("https://sessionserver.mojang.com/session/minecraft/hasJoined?username={username}&serverId={hash}");
                let _response = match fetch_url(&url).await {
                    Ok(v) => Ok(v),
                    Err(_) => Err((ConnectionHandlerError::PacketSent("".to_string()), player.clone())),
                }?;
                println!("Being here!");
            },
            ServerboundPackets::LoginAcknowledged => {
                match sender.send(ServerMessage::AddPlayer(player.clone())).await {
                    Ok(()) => {
                        *state = State::Configuration;
                    },
                    Err(_) => return Err((ConnectionHandlerError::ReponseError, player.clone())),
                }
            }
            ServerboundPackets::Pong => unreachable!(),
            ServerboundPackets::KeepAlive(_keep_alive) => {
                *keep_alive_answered_at = Instant::now();
                *await_keep_alive_answer = false;
            },
            ServerboundPackets::ClientInformation(client_information) => {
                if let Some(player) = player {
                    println!("Received Client Information");
                    use datatypes::ImportantFunctions;
                    let locale = client_information.locale.get_value();
                    let view_distance = client_information.view_distance.get_value() as u8;
                    let chat_mode = client_information.chat_mode.get_value();
                    let chat_colors = client_information.chat_colors.get_value();
                    let displayed_skin_parts = client_information.displayed_skin_parts.get_value();
                    let main_hand = client_information.main_hand.get_value();
                    let text_filtering = client_information.text_filtering.get_value();
                    let in_server_listing = client_information.allow_server_listing.get_value();
                    player.information = Some(
                        crate::player::PlayerInformation { locale , view_distance, chat_mode, chat_colors , displayed_skin_parts, main_hand, text_filtering, in_server_listing }
                    );
                    println!("Finished with this");
                }
                let (config_packets_sender, config_packets_receiver) = oneshot::channel::<Vec<ConfigData>>();
                if let Err(_) = sender.send(ServerMessage::GetConfigData(config_packets_sender)).await {
                    return Err((ConnectionHandlerError::ChannelError, player.clone()));
                }
                let response = match config_packets_receiver.await {
                    Ok(v) => v,
                    Err(_) => return Err((ConnectionHandlerError::ChannelError, player.clone())),
                };
                let response = response.iter().map(|e| match e {
                    ConfigData::PluginMessage(channel, data) => {
                        ClientboundPackets::ConfigurationPluginMessage(
                            ConfigurationClientboundPluginMessage::new(datatypes::Identifier::new(channel.clone()), data.clone()))
                    }
                    ConfigData::RegistryData(nbt) => {
                        ClientboundPackets::RegistryData(RegistryData::new(nbt.clone()))
                    }
                    ConfigData::RemoveResourcePack(uuid) => {
                        ClientboundPackets::RemoveResourcePack(RemoveResoucePack::new(uuid.clone()))
                    }
                    ConfigData::AddResourcePack(uuid, url, hash, forced, prompt_message) => {
                        ClientboundPackets::AddResourcePack(AddResourcePack::new(*uuid, url.clone(), hash.clone(), *forced, prompt_message.clone()))
                    }
                    ConfigData::FeatureFlags(flags) => {
                        ClientboundPackets::FeatureFlags(FeatureFlags::new(flags.iter().map(|s|datatypes::Identifier::new(s.clone())).collect()))
                    }
                });
                packet_queue.extend(response);
                packet_queue.push_back(ClientboundPackets::FinishConfiguration(FinishConfiguration::new()));
                println!("queued packets: {}", packet_queue.len());
            },
            ServerboundPackets::AckFinishConfiguration => {
                println!("Switching to playing");
                *state = State::Playing;
            },
            ServerboundPackets::ResoucePackResponse(res) => {
                use datatypes::ImportantFunctions;
                use fast_protocol::datatypes::packets::ResourcePackResponseEnum as RPRE;
                let uuid = res.uuid;
                let response = res.result.get_value();
                let applied: bool = match response {
                    RPRE::Declined | RPRE::Discarded | RPRE::InvalidUrl | RPRE::FailedToReload | RPRE::FailedToDownload => false,
                    RPRE::Accepted | RPRE::Downloaded | RPRE::SuccessfullyDownloaded => true,
                };
                let (answer_sender, answer_receiver) = tokio::sync::oneshot::channel::<bool>();
                let _ = sender.send(ServerMessage::IsResourcePackImportant(uuid, answer_sender)).await;
                let is_important = answer_receiver.await.unwrap_or(true);
                if is_important && !applied {
                    match ConfigurationDisconnect::new("Important Resource Pack got not applied".to_string()).write(writer).await {
                        Ok(()) => (),
                        Err(_) => return Err((ConnectionHandlerError::PacketSent("".to_string()), player.clone())),
                    }
                }
            },
            ServerboundPackets::ConfigurationPluginMessage(_message) => {
                let channel = _message.channel.get_value();
                let data = _message.data.get_value();
                let _ = sender.send(ServerMessage::PluginMessage(player.clone(), channel, data)).await;
            }
        }
        Ok(())
    }
    /// function to handle the connection
    ///
    /// # Arguments
    /// `stream` - A mutable reference to the `TcpStream` of the connection
    /// `sender` - A `Sender<ServerMessage>` to be able to communicate with the server
    /// `settings` - A instance of the `ServerSettings`
    pub(crate) async fn run(stream: &mut TcpStream,
        sender: mpsc::Sender<ServerMessage>,
        settings: ServerSettings,
    ) -> Result<(), (ConnectionHandlerError, Option<Player>)>{
        println!("Connected");
        let mut player: Option<Player> = None;
        let mut state = State::Handshake;
        let (player_sender, mut player_receiver) = mpsc::channel::<PlayerMessages>(20);
        let (reader, mut writer) = stream.split();
        let mut reader = BufReader::new(reader);
        let mut verify_token = [0;4];
        let mut last_keep_alive_send = Instant::now();
        let mut keep_alive_answered_at = Instant::now();
        let mut await_keep_alive_answer = false;
        rand::thread_rng().fill_bytes(&mut verify_token);
        let mut last_checked = Instant::now();
        let update_time = tokio::time::Duration::from_secs(1).div_f64(100.0);
        let mut packet_queue: VecDeque<fast_protocol::datatypes::packets::ClientboundPackets> = VecDeque::new();
        loop {
            println!("Packet amount: {}", packet_queue.len());
            if let Some(packet) = packet_queue.pop_front() {
                packet.write(&mut writer).await;
            }
            Self::send_keep_alive(&state, &mut writer, &mut last_keep_alive_send, &keep_alive_answered_at, &mut await_keep_alive_answer, &player).await?;
            Self::receive_packets(&mut reader, &mut writer, &sender, &mut state, &mut player, &settings, &mut keep_alive_answered_at, &mut await_keep_alive_answer, &player_sender,
                &mut packet_queue).await?;
            Self::handle_incomming_messages(&mut writer, &mut player_receiver, &mut packet_queue).await;
            let now = Instant::now();
            let sleep_time = now.duration_since(last_checked).saturating_sub(update_time);
            last_checked = now;
            tokio::time::sleep(sleep_time).await;
        }
    }
}
async fn fetch_url(url: &str) -> reqwest::Result<String> {
    let response = reqwest::get(url).await?;
    let body = response.text().await?;
    Ok(body)
}