You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

2861 lines
81 KiB

  1. /*
  2. * This file is part of GNUnet
  3. * Copyright (C) 2013 GNUnet e.V.
  4. *
  5. * GNUnet is free software: you can redistribute it and/or modify it
  6. * under the terms of the GNU Affero General Public License as published
  7. * by the Free Software Foundation, either version 3 of the License,
  8. * or (at your option) any later version.
  9. *
  10. * GNUnet is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. * Affero General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU Affero General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. SPDX-License-Identifier: AGPL3.0-or-later
  18. */
  19. /**
  20. * @file psyc/gnunet-service-psyc.c
  21. * @brief PSYC service
  22. * @author Gabor X Toth
  23. */
  24. #include <inttypes.h>
  25. #include "platform.h"
  26. #include "gnunet_util_lib.h"
  27. #include "gnunet_constants.h"
  28. #include "gnunet_protocols.h"
  29. #include "gnunet_statistics_service.h"
  30. #include "gnunet_multicast_service.h"
  31. #include "gnunet_psycstore_service.h"
  32. #include "gnunet_psyc_service.h"
  33. #include "gnunet_psyc_util_lib.h"
  34. #include "psyc.h"
  35. /**
  36. * Handle to our current configuration.
  37. */
  38. static const struct GNUNET_CONFIGURATION_Handle *cfg;
  39. /**
  40. * Service handle.
  41. */
  42. static struct GNUNET_SERVICE_Handle *service;
  43. /**
  44. * Handle to the statistics service.
  45. */
  46. static struct GNUNET_STATISTICS_Handle *stats;
  47. /**
  48. * Handle to the PSYCstore.
  49. */
  50. static struct GNUNET_PSYCSTORE_Handle *store;
  51. /**
  52. * All connected masters.
  53. * Channel's pub_key_hash -> struct Master
  54. */
  55. static struct GNUNET_CONTAINER_MultiHashMap *masters;
  56. /**
  57. * All connected slaves.
  58. * Channel's pub_key_hash -> struct Slave
  59. */
  60. static struct GNUNET_CONTAINER_MultiHashMap *slaves;
  61. /**
  62. * Connected slaves per channel.
  63. * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
  64. */
  65. static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
  66. /**
  67. * Message in the transmission queue.
  68. */
  69. struct TransmitMessage
  70. {
  71. struct TransmitMessage *prev;
  72. struct TransmitMessage *next;
  73. struct GNUNET_SERVICE_Client *client;
  74. /**
  75. * ID assigned to the message.
  76. */
  77. uint64_t id;
  78. /**
  79. * Size of message.
  80. */
  81. uint16_t size;
  82. /**
  83. * Type of first message part.
  84. */
  85. uint16_t first_ptype;
  86. /**
  87. * Type of last message part.
  88. */
  89. uint16_t last_ptype;
  90. /* Followed by message */
  91. };
  92. /**
  93. * Cache for received message fragments.
  94. * Message fragments are only sent to clients after all modifiers arrived.
  95. *
  96. * chan_key -> MultiHashMap chan_msgs
  97. */
  98. static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
  99. /**
  100. * Entry in the chan_msgs hashmap of @a recv_cache:
  101. * fragment_id -> RecvCacheEntry
  102. */
  103. struct RecvCacheEntry
  104. {
  105. struct GNUNET_MULTICAST_MessageHeader *mmsg;
  106. uint16_t ref_count;
  107. };
  108. /**
  109. * Entry in the @a recv_frags hash map of a @a Channel.
  110. * message_id -> FragmentQueue
  111. */
  112. struct FragmentQueue
  113. {
  114. /**
  115. * Fragment IDs stored in @a recv_cache.
  116. */
  117. struct GNUNET_CONTAINER_Heap *fragments;
  118. /**
  119. * Total size of received fragments.
  120. */
  121. uint64_t size;
  122. /**
  123. * Total size of received header fragments (METHOD & MODIFIERs)
  124. */
  125. uint64_t header_size;
  126. /**
  127. * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
  128. */
  129. uint64_t state_delta;
  130. /**
  131. * The @a flags field from struct GNUNET_PSYC_MessageMethod.
  132. */
  133. uint32_t flags;
  134. /**
  135. * Receive state of message.
  136. *
  137. * @see MessageFragmentState
  138. */
  139. uint8_t state;
  140. /**
  141. * Whether the state is already modified in PSYCstore.
  142. */
  143. uint8_t state_is_modified;
  144. /**
  145. * Is the message queued for delivery to the client?
  146. * i.e. added to the recv_msgs queue
  147. */
  148. uint8_t is_queued;
  149. };
  150. /**
  151. * List of connected clients.
  152. */
  153. struct ClientList
  154. {
  155. struct ClientList *prev;
  156. struct ClientList *next;
  157. struct GNUNET_SERVICE_Client *client;
  158. };
  159. struct Operation
  160. {
  161. struct Operation *prev;
  162. struct Operation *next;
  163. struct GNUNET_SERVICE_Client *client;
  164. struct Channel *channel;
  165. uint64_t op_id;
  166. uint32_t flags;
  167. };
  168. /**
  169. * Common part of the client context for both a channel master and slave.
  170. */
  171. struct Channel
  172. {
  173. struct ClientList *clients_head;
  174. struct ClientList *clients_tail;
  175. struct Operation *op_head;
  176. struct Operation *op_tail;
  177. struct TransmitMessage *tmit_head;
  178. struct TransmitMessage *tmit_tail;
  179. /**
  180. * Current PSYCstore operation.
  181. */
  182. struct GNUNET_PSYCSTORE_OperationHandle *store_op;
  183. /**
  184. * Received fragments not yet sent to the client.
  185. * message_id -> FragmentQueue
  186. */
  187. struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
  188. /**
  189. * Received message IDs not yet sent to the client.
  190. */
  191. struct GNUNET_CONTAINER_Heap *recv_msgs;
  192. /**
  193. * Public key of the channel.
  194. */
  195. struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
  196. /**
  197. * Hash of @a pub_key.
  198. */
  199. struct GNUNET_HashCode pub_key_hash;
  200. /**
  201. * Last message ID sent to the client.
  202. * 0 if there is no such message.
  203. */
  204. uint64_t max_message_id;
  205. /**
  206. * ID of the last stateful message, where the state operations has been
  207. * processed and saved to PSYCstore and which has been sent to the client.
  208. * 0 if there is no such message.
  209. */
  210. uint64_t max_state_message_id;
  211. /**
  212. * Expected value size for the modifier being received from the PSYC service.
  213. */
  214. uint32_t tmit_mod_value_size_expected;
  215. /**
  216. * Actual value size for the modifier being received from the PSYC service.
  217. */
  218. uint32_t tmit_mod_value_size;
  219. /**
  220. * Is this channel ready to receive messages from client?
  221. * #GNUNET_YES or #GNUNET_NO
  222. */
  223. uint8_t is_ready;
  224. /**
  225. * Is the client disconnected?
  226. * #GNUNET_YES or #GNUNET_NO
  227. */
  228. uint8_t is_disconnecting;
  229. /**
  230. * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
  231. */
  232. uint8_t is_master;
  233. union {
  234. struct Master *master;
  235. struct Slave *slave;
  236. };
  237. };
  238. /**
  239. * Client context for a channel master.
  240. */
  241. struct Master
  242. {
  243. /**
  244. * Channel struct common for Master and Slave
  245. */
  246. struct Channel channel;
  247. /**
  248. * Private key of the channel.
  249. */
  250. struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
  251. /**
  252. * Handle for the multicast origin.
  253. */
  254. struct GNUNET_MULTICAST_Origin *origin;
  255. /**
  256. * Transmit handle for multicast.
  257. */
  258. struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
  259. /**
  260. * Incoming join requests from multicast.
  261. * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
  262. */
  263. struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
  264. /**
  265. * Last message ID transmitted to this channel.
  266. *
  267. * Incremented before sending a message, thus the message_id in messages sent
  268. * starts from 1.
  269. */
  270. uint64_t max_message_id;
  271. /**
  272. * ID of the last message with state operations transmitted to the channel.
  273. * 0 if there is no such message.
  274. */
  275. uint64_t max_state_message_id;
  276. /**
  277. * Maximum group generation transmitted to the channel.
  278. */
  279. uint64_t max_group_generation;
  280. /**
  281. * @see enum GNUNET_PSYC_Policy
  282. */
  283. enum GNUNET_PSYC_Policy policy;
  284. };
  285. /**
  286. * Client context for a channel slave.
  287. */
  288. struct Slave
  289. {
  290. /**
  291. * Channel struct common for Master and Slave
  292. */
  293. struct Channel channel;
  294. /**
  295. * Private key of the slave.
  296. */
  297. struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
  298. /**
  299. * Public key of the slave.
  300. */
  301. struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
  302. /**
  303. * Hash of @a pub_key.
  304. */
  305. struct GNUNET_HashCode pub_key_hash;
  306. /**
  307. * Handle for the multicast member.
  308. */
  309. struct GNUNET_MULTICAST_Member *member;
  310. /**
  311. * Transmit handle for multicast.
  312. */
  313. struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
  314. /**
  315. * Peer identity of the origin.
  316. */
  317. struct GNUNET_PeerIdentity origin;
  318. /**
  319. * Number of items in @a relays.
  320. */
  321. uint32_t relay_count;
  322. /**
  323. * Relays that multicast can use to connect.
  324. */
  325. struct GNUNET_PeerIdentity *relays;
  326. /**
  327. * Join request to be transmitted to the master on join.
  328. */
  329. struct GNUNET_PSYC_Message *join_msg;
  330. /**
  331. * Join decision received from multicast.
  332. */
  333. struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
  334. /**
  335. * Maximum request ID for this channel.
  336. */
  337. uint64_t max_request_id;
  338. /**
  339. * Join flags.
  340. */
  341. enum GNUNET_PSYC_SlaveJoinFlags join_flags;
  342. };
  343. /**
  344. * Client context.
  345. */
  346. struct Client {
  347. struct GNUNET_SERVICE_Client *client;
  348. struct Channel *channel;
  349. };
  350. struct ReplayRequestKey
  351. {
  352. uint64_t fragment_id;
  353. uint64_t message_id;
  354. uint64_t fragment_offset;
  355. uint64_t flags;
  356. };
  357. static void
  358. transmit_message (struct Channel *chn);
  359. static uint64_t
  360. message_queue_run (struct Channel *chn);
  361. static uint64_t
  362. message_queue_drop (struct Channel *chn);
  363. static void
  364. schedule_transmit_message (void *cls)
  365. {
  366. struct Channel *chn = cls;
  367. transmit_message (chn);
  368. }
  369. /**
  370. * Task run during shutdown.
  371. *
  372. * @param cls unused
  373. */
  374. static void
  375. shutdown_task (void *cls)
  376. {
  377. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  378. "shutting down...\n");
  379. GNUNET_PSYCSTORE_disconnect (store);
  380. if (NULL != stats)
  381. {
  382. GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
  383. stats = NULL;
  384. }
  385. }
  386. static struct Operation *
  387. op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
  388. uint64_t op_id, uint32_t flags)
  389. {
  390. struct Operation *op = GNUNET_malloc (sizeof (*op));
  391. op->client = client;
  392. op->channel = chn;
  393. op->op_id = op_id;
  394. op->flags = flags;
  395. GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
  396. return op;
  397. }
  398. static void
  399. op_remove (struct Operation *op)
  400. {
  401. GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
  402. GNUNET_free (op);
  403. }
  404. /**
  405. * Clean up master data structures after a client disconnected.
  406. */
  407. static void
  408. cleanup_master (struct Master *mst)
  409. {
  410. struct Channel *chn = &mst->channel;
  411. GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
  412. GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
  413. }
  414. /**
  415. * Clean up slave data structures after a client disconnected.
  416. */
  417. static void
  418. cleanup_slave (struct Slave *slv)
  419. {
  420. struct Channel *chn = &slv->channel;
  421. struct GNUNET_CONTAINER_MultiHashMap *
  422. chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
  423. &chn->pub_key_hash);
  424. GNUNET_assert (NULL != chn_slv);
  425. GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
  426. if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
  427. {
  428. GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
  429. chn_slv);
  430. GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
  431. }
  432. GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
  433. if (NULL != slv->join_msg)
  434. {
  435. GNUNET_free (slv->join_msg);
  436. slv->join_msg = NULL;
  437. }
  438. if (NULL != slv->relays)
  439. {
  440. GNUNET_free (slv->relays);
  441. slv->relays = NULL;
  442. }
  443. GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
  444. }
  445. /**
  446. * Clean up channel data structures after a client disconnected.
  447. */
  448. static void
  449. cleanup_channel (struct Channel *chn)
  450. {
  451. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  452. "%p Cleaning up channel %s. master? %u\n",
  453. chn,
  454. GNUNET_h2s (&chn->pub_key_hash),
  455. chn->is_master);
  456. message_queue_drop (chn);
  457. GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
  458. chn->recv_frags = NULL;
  459. if (NULL != chn->store_op)
  460. {
  461. GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
  462. chn->store_op = NULL;
  463. }
  464. (GNUNET_YES == chn->is_master)
  465. ? cleanup_master (chn->master)
  466. : cleanup_slave (chn->slave);
  467. GNUNET_free (chn);
  468. }
  469. /**
  470. * Called whenever a client is disconnected.
  471. * Frees our resources associated with that client.
  472. *
  473. * @param cls closure
  474. * @param client identification of the client
  475. * @param app_ctx must match @a client
  476. */
  477. static void
  478. client_notify_disconnect (void *cls,
  479. struct GNUNET_SERVICE_Client *client,
  480. void *app_ctx)
  481. {
  482. struct Client *c = app_ctx;
  483. struct Channel *chn = c->channel;
  484. GNUNET_free (c);
  485. if (NULL == chn)
  486. {
  487. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  488. "%p User context is NULL in client_notify_disconnect ()\n",
  489. chn);
  490. GNUNET_break (0);
  491. return;
  492. }
  493. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  494. "%p Client %p (%s) disconnected from channel %s\n",
  495. chn,
  496. client,
  497. (GNUNET_YES == chn->is_master) ? "master" : "slave",
  498. GNUNET_h2s (&chn->pub_key_hash));
  499. struct ClientList *cli = chn->clients_head;
  500. while (NULL != cli)
  501. {
  502. if (cli->client == client)
  503. {
  504. GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
  505. GNUNET_free (cli);
  506. break;
  507. }
  508. cli = cli->next;
  509. }
  510. struct Operation *op = chn->op_head;
  511. while (NULL != op)
  512. {
  513. if (op->client == client)
  514. {
  515. op->client = NULL;
  516. break;
  517. }
  518. op = op->next;
  519. }
  520. if (NULL == chn->clients_head)
  521. { /* Last client disconnected. */
  522. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  523. "%p Last client (%s) disconnected from channel %s\n",
  524. chn,
  525. (GNUNET_YES == chn->is_master) ? "master" : "slave",
  526. GNUNET_h2s (&chn->pub_key_hash));
  527. chn->is_disconnecting = GNUNET_YES;
  528. cleanup_channel (chn);
  529. }
  530. }
  531. /**
  532. * A new client connected.
  533. *
  534. * @param cls NULL
  535. * @param client client to add
  536. * @param mq message queue for @a client
  537. * @return @a client
  538. */
  539. static void *
  540. client_notify_connect (void *cls,
  541. struct GNUNET_SERVICE_Client *client,
  542. struct GNUNET_MQ_Handle *mq)
  543. {
  544. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
  545. struct Client *c = GNUNET_malloc (sizeof (*c));
  546. c->client = client;
  547. return c;
  548. }
  549. /**
  550. * Send message to all clients connected to the channel.
  551. */
  552. static void
  553. client_send_msg (const struct Channel *chn,
  554. const struct GNUNET_MessageHeader *msg)
  555. {
  556. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  557. "Sending message to clients of channel %p.\n",
  558. chn);
  559. struct ClientList *cli = chn->clients_head;
  560. while (NULL != cli)
  561. {
  562. struct GNUNET_MQ_Envelope *
  563. env = GNUNET_MQ_msg_copy (msg);
  564. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
  565. env);
  566. cli = cli->next;
  567. }
  568. }
  569. /**
  570. * Send a result code back to the client.
  571. *
  572. * @param client
  573. * Client that should receive the result code.
  574. * @param result_code
  575. * Code to transmit.
  576. * @param op_id
  577. * Operation ID in network byte order.
  578. * @param data
  579. * Data payload or NULL.
  580. * @param data_size
  581. * Size of @a data.
  582. */
  583. static void
  584. client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
  585. int64_t result_code, const void *data, uint16_t data_size)
  586. {
  587. struct GNUNET_OperationResultMessage *res;
  588. struct GNUNET_MQ_Envelope *
  589. env = GNUNET_MQ_msg_extra (res,
  590. data_size,
  591. GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
  592. res->result_code = GNUNET_htonll (result_code);
  593. res->op_id = op_id;
  594. if (0 < data_size)
  595. GNUNET_memcpy (&res[1], data, data_size);
  596. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  597. "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n",
  598. client,
  599. GNUNET_ntohll (op_id),
  600. result_code,
  601. data_size);
  602. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
  603. }
  604. /**
  605. * Closure for join_mem_test_cb()
  606. */
  607. struct JoinMemTestClosure
  608. {
  609. struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
  610. struct Channel *channel;
  611. struct GNUNET_MULTICAST_JoinHandle *join_handle;
  612. struct GNUNET_PSYC_JoinRequestMessage *join_msg;
  613. };
  614. /**
  615. * Membership test result callback used for join requests.
  616. */
  617. static void
  618. join_mem_test_cb (void *cls, int64_t result,
  619. const char *err_msg, uint16_t err_msg_size)
  620. {
  621. struct JoinMemTestClosure *jcls = cls;
  622. if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
  623. { /* Pass on join request to client if this is a master channel */
  624. struct Master *mst = jcls->channel->master;
  625. struct GNUNET_HashCode slave_pub_hash;
  626. GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
  627. &slave_pub_hash);
  628. GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
  629. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
  630. client_send_msg (jcls->channel, &jcls->join_msg->header);
  631. }
  632. else
  633. {
  634. if (GNUNET_SYSERR == result)
  635. {
  636. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  637. "Could not perform membership test (%.*s)\n",
  638. err_msg_size, err_msg);
  639. }
  640. // FIXME: add relays
  641. GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
  642. }
  643. GNUNET_free (jcls->join_msg);
  644. GNUNET_free (jcls);
  645. }
  646. /**
  647. * Incoming join request from multicast.
  648. */
  649. static void
  650. mcast_recv_join_request (void *cls,
  651. const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
  652. const struct GNUNET_MessageHeader *join_msg,
  653. struct GNUNET_MULTICAST_JoinHandle *jh)
  654. {
  655. struct Channel *chn = cls;
  656. uint16_t join_msg_size = 0;
  657. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  658. "%p Got join request.\n",
  659. chn);
  660. if (NULL != join_msg)
  661. {
  662. if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
  663. {
  664. join_msg_size = ntohs (join_msg->size);
  665. }
  666. else
  667. {
  668. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  669. "%p Got join message with invalid type %u.\n",
  670. chn,
  671. ntohs (join_msg->type));
  672. }
  673. }
  674. struct GNUNET_PSYC_JoinRequestMessage *
  675. req = GNUNET_malloc (sizeof (*req) + join_msg_size);
  676. req->header.size = htons (sizeof (*req) + join_msg_size);
  677. req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
  678. req->slave_pub_key = *slave_pub_key;
  679. if (0 < join_msg_size)
  680. GNUNET_memcpy (&req[1], join_msg, join_msg_size);
  681. struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
  682. jcls->slave_pub_key = *slave_pub_key;
  683. jcls->channel = chn;
  684. jcls->join_handle = jh;
  685. jcls->join_msg = req;
  686. GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
  687. chn->max_message_id, 0,
  688. &join_mem_test_cb, jcls);
  689. }
  690. /**
  691. * Join decision received from multicast.
  692. */
  693. static void
  694. mcast_recv_join_decision (void *cls, int is_admitted,
  695. const struct GNUNET_PeerIdentity *peer,
  696. uint16_t relay_count,
  697. const struct GNUNET_PeerIdentity *relays,
  698. const struct GNUNET_MessageHeader *join_resp)
  699. {
  700. struct Slave *slv = cls;
  701. struct Channel *chn = &slv->channel;
  702. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  703. "%p Got join decision: %d\n",
  704. slv,
  705. is_admitted);
  706. if (GNUNET_YES == chn->is_ready)
  707. {
  708. /* Already admitted */
  709. return;
  710. }
  711. uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
  712. struct GNUNET_PSYC_JoinDecisionMessage *
  713. dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
  714. dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
  715. dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
  716. dcsn->is_admitted = htonl (is_admitted);
  717. if (0 < join_resp_size)
  718. GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
  719. client_send_msg (chn, &dcsn->header);
  720. if (GNUNET_YES == is_admitted
  721. && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
  722. {
  723. chn->is_ready = GNUNET_YES;
  724. }
  725. }
  726. static int
  727. store_recv_fragment_replay (void *cls,
  728. struct GNUNET_MULTICAST_MessageHeader *msg,
  729. enum GNUNET_PSYCSTORE_MessageFlags flags)
  730. {
  731. struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
  732. GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
  733. return GNUNET_YES;
  734. }
  735. /**
  736. * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
  737. */
  738. static void
  739. store_recv_fragment_replay_result (void *cls,
  740. int64_t result,
  741. const char *err_msg,
  742. uint16_t err_msg_size)
  743. {
  744. struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
  745. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  746. "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
  747. rh,
  748. result,
  749. err_msg_size,
  750. err_msg);
  751. switch (result)
  752. {
  753. case GNUNET_YES:
  754. break;
  755. case GNUNET_NO:
  756. GNUNET_MULTICAST_replay_response (rh, NULL,
  757. GNUNET_MULTICAST_REC_NOT_FOUND);
  758. return;
  759. case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
  760. GNUNET_MULTICAST_replay_response (rh, NULL,
  761. GNUNET_MULTICAST_REC_ACCESS_DENIED);
  762. return;
  763. case GNUNET_SYSERR:
  764. GNUNET_MULTICAST_replay_response (rh, NULL,
  765. GNUNET_MULTICAST_REC_INTERNAL_ERROR);
  766. return;
  767. }
  768. /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
  769. * an error code, so it must be ensured no further processing
  770. * is attempted on 'rh'. Maybe this should be refactored as
  771. * it doesn't look very intuitive. --lynX
  772. */
  773. GNUNET_MULTICAST_replay_response_end (rh);
  774. }
  775. /**
  776. * Incoming fragment replay request from multicast.
  777. */
  778. static void
  779. mcast_recv_replay_fragment (void *cls,
  780. const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
  781. uint64_t fragment_id, uint64_t flags,
  782. struct GNUNET_MULTICAST_ReplayHandle *rh)
  783. {
  784. struct Channel *chn = cls;
  785. GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
  786. fragment_id, fragment_id,
  787. &store_recv_fragment_replay,
  788. &store_recv_fragment_replay_result, rh);
  789. }
  790. /**
  791. * Incoming message replay request from multicast.
  792. */
  793. static void
  794. mcast_recv_replay_message (void *cls,
  795. const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
  796. uint64_t message_id,
  797. uint64_t fragment_offset,
  798. uint64_t flags,
  799. struct GNUNET_MULTICAST_ReplayHandle *rh)
  800. {
  801. struct Channel *chn = cls;
  802. GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
  803. message_id, message_id, 1, NULL,
  804. &store_recv_fragment_replay,
  805. &store_recv_fragment_replay_result, rh);
  806. }
  807. /**
  808. * Convert an uint64_t in network byte order to a HashCode
  809. * that can be used as key in a MultiHashMap
  810. */
  811. static inline void
  812. hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
  813. {
  814. /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
  815. /* TODO: use built-in byte swap functions if available */
  816. n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
  817. n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
  818. *key = (struct GNUNET_HashCode) {};
  819. *((uint64_t *) key)
  820. = (n << 32) | (n >> 32);
  821. }
  822. /**
  823. * Convert an uint64_t in host byte order to a HashCode
  824. * that can be used as key in a MultiHashMap
  825. */
  826. static inline void
  827. hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
  828. {
  829. #if __BYTE_ORDER == __BIG_ENDIAN
  830. hash_key_from_nll (key, n);
  831. #elif __BYTE_ORDER == __LITTLE_ENDIAN
  832. *key = (struct GNUNET_HashCode) {};
  833. *((uint64_t *) key) = n;
  834. #else
  835. #error byteorder undefined
  836. #endif
  837. }
  838. /**
  839. * Initialize PSYC message header.
  840. */
  841. static inline void
  842. psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
  843. const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
  844. {
  845. uint16_t size = ntohs (mmsg->header.size);
  846. uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
  847. pmsg->header.size = htons (psize);
  848. pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
  849. pmsg->message_id = mmsg->message_id;
  850. pmsg->fragment_offset = mmsg->fragment_offset;
  851. pmsg->flags = htonl (flags);
  852. GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
  853. }
  854. /**
  855. * Create a new PSYC message from a multicast message for sending it to clients.
  856. */
  857. static inline struct GNUNET_PSYC_MessageHeader *
  858. psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
  859. {
  860. struct GNUNET_PSYC_MessageHeader *pmsg;
  861. uint16_t size = ntohs (mmsg->header.size);
  862. uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
  863. pmsg = GNUNET_malloc (psize);
  864. psyc_msg_init (pmsg, mmsg, flags);
  865. return pmsg;
  866. }
  867. /**
  868. * Send multicast message to all clients connected to the channel.
  869. */
  870. static void
  871. client_send_mcast_msg (struct Channel *chn,
  872. const struct GNUNET_MULTICAST_MessageHeader *mmsg,
  873. uint32_t flags)
  874. {
  875. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  876. "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
  877. chn,
  878. GNUNET_ntohll (mmsg->fragment_id),
  879. GNUNET_ntohll (mmsg->message_id));
  880. struct GNUNET_PSYC_MessageHeader *
  881. pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
  882. client_send_msg (chn, &pmsg->header);
  883. GNUNET_free (pmsg);
  884. }
  885. /**
  886. * Send multicast request to all clients connected to the channel.
  887. */
  888. static void
  889. client_send_mcast_req (struct Master *mst,
  890. const struct GNUNET_MULTICAST_RequestHeader *req)
  891. {
  892. struct Channel *chn = &mst->channel;
  893. struct GNUNET_PSYC_MessageHeader *pmsg;
  894. uint16_t size = ntohs (req->header.size);
  895. uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
  896. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  897. "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
  898. chn,
  899. GNUNET_ntohll (req->fragment_id),
  900. GNUNET_ntohll (req->request_id));
  901. pmsg = GNUNET_malloc (psize);
  902. pmsg->header.size = htons (psize);
  903. pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
  904. pmsg->message_id = req->request_id;
  905. pmsg->fragment_offset = req->fragment_offset;
  906. pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
  907. pmsg->slave_pub_key = req->member_pub_key;
  908. GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
  909. client_send_msg (chn, &pmsg->header);
  910. /* FIXME: save req to PSYCstore so that it can be resent later to clients */
  911. GNUNET_free (pmsg);
  912. }
  913. /**
  914. * Insert a multicast message fragment into the queue belonging to the message.
  915. *
  916. * @param chn Channel.
  917. * @param mmsg Multicast message fragment.
  918. * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
  919. * @param first_ptype First PSYC message part type in @a mmsg.
  920. * @param last_ptype Last PSYC message part type in @a mmsg.
  921. */
  922. static void
  923. fragment_queue_insert (struct Channel *chn,
  924. const struct GNUNET_MULTICAST_MessageHeader *mmsg,
  925. uint16_t first_ptype, uint16_t last_ptype)
  926. {
  927. const uint16_t size = ntohs (mmsg->header.size);
  928. const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
  929. struct GNUNET_CONTAINER_MultiHashMap
  930. *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
  931. &chn->pub_key_hash);
  932. struct GNUNET_HashCode msg_id_hash;
  933. hash_key_from_nll (&msg_id_hash, mmsg->message_id);
  934. struct FragmentQueue
  935. *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
  936. if (NULL == fragq)
  937. {
  938. fragq = GNUNET_malloc (sizeof (*fragq));
  939. fragq->state = MSG_FRAG_STATE_HEADER;
  940. fragq->fragments
  941. = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
  942. GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
  943. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
  944. if (NULL == chan_msgs)
  945. {
  946. chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
  947. GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
  948. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
  949. }
  950. }
  951. struct GNUNET_HashCode frag_id_hash;
  952. hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
  953. struct RecvCacheEntry
  954. *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
  955. if (NULL == cache_entry)
  956. {
  957. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  958. "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
  959. chn,
  960. GNUNET_ntohll (mmsg->message_id),
  961. GNUNET_ntohll (mmsg->fragment_id));
  962. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  963. "%p header_size: %" PRIu64 " + %u\n",
  964. chn,
  965. fragq->header_size,
  966. size);
  967. cache_entry = GNUNET_malloc (sizeof (*cache_entry));
  968. cache_entry->ref_count = 1;
  969. cache_entry->mmsg = GNUNET_malloc (size);
  970. GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
  971. GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
  972. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
  973. }
  974. else
  975. {
  976. cache_entry->ref_count++;
  977. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  978. "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
  979. chn,
  980. GNUNET_ntohll (mmsg->message_id),
  981. GNUNET_ntohll (mmsg->fragment_id),
  982. cache_entry->ref_count);
  983. }
  984. if (MSG_FRAG_STATE_HEADER == fragq->state)
  985. {
  986. if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
  987. {
  988. struct GNUNET_PSYC_MessageMethod *
  989. pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
  990. fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
  991. fragq->flags = ntohl (pmeth->flags);
  992. }
  993. if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
  994. {
  995. fragq->header_size += size;
  996. }
  997. else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
  998. || frag_offset == fragq->header_size)
  999. { /* header is now complete */
  1000. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1001. "%p Header of message %" PRIu64 " is complete.\n",
  1002. chn,
  1003. GNUNET_ntohll (mmsg->message_id));
  1004. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1005. "%p Adding message %" PRIu64 " to queue.\n",
  1006. chn,
  1007. GNUNET_ntohll (mmsg->message_id));
  1008. fragq->state = MSG_FRAG_STATE_DATA;
  1009. }
  1010. else
  1011. {
  1012. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1013. "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
  1014. chn,
  1015. GNUNET_ntohll (mmsg->message_id),
  1016. frag_offset,
  1017. fragq->header_size);
  1018. }
  1019. }
  1020. switch (last_ptype)
  1021. {
  1022. case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
  1023. if (frag_offset == fragq->size)
  1024. fragq->state = MSG_FRAG_STATE_END;
  1025. else
  1026. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1027. "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
  1028. chn,
  1029. GNUNET_ntohll (mmsg->message_id),
  1030. frag_offset,
  1031. fragq->size);
  1032. break;
  1033. case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
  1034. /* Drop message without delivering to client if it's a single fragment */
  1035. fragq->state =
  1036. (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
  1037. ? MSG_FRAG_STATE_DROP
  1038. : MSG_FRAG_STATE_CANCEL;
  1039. }
  1040. switch (fragq->state)
  1041. {
  1042. case MSG_FRAG_STATE_DATA:
  1043. case MSG_FRAG_STATE_END:
  1044. case MSG_FRAG_STATE_CANCEL:
  1045. if (GNUNET_NO == fragq->is_queued)
  1046. {
  1047. GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
  1048. GNUNET_ntohll (mmsg->message_id));
  1049. fragq->is_queued = GNUNET_YES;
  1050. }
  1051. }
  1052. fragq->size += size;
  1053. GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
  1054. GNUNET_ntohll (mmsg->fragment_id));
  1055. }
  1056. /**
  1057. * Run fragment queue of a message.
  1058. *
  1059. * Send fragments of a message in order to client, after all modifiers arrived
  1060. * from multicast.
  1061. *
  1062. * @param chn
  1063. * Channel.
  1064. * @param msg_id
  1065. * ID of the message @a fragq belongs to.
  1066. * @param fragq
  1067. * Fragment queue of the message.
  1068. * @param drop
  1069. * Drop message without delivering to client?
  1070. * #GNUNET_YES or #GNUNET_NO.
  1071. */
  1072. static void
  1073. fragment_queue_run (struct Channel *chn, uint64_t msg_id,
  1074. struct FragmentQueue *fragq, uint8_t drop)
  1075. {
  1076. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1077. "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
  1078. chn,
  1079. msg_id,
  1080. fragq->state);
  1081. struct GNUNET_CONTAINER_MultiHashMap
  1082. *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
  1083. &chn->pub_key_hash);
  1084. GNUNET_assert (NULL != chan_msgs);
  1085. uint64_t frag_id;
  1086. while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
  1087. &frag_id))
  1088. {
  1089. struct GNUNET_HashCode frag_id_hash;
  1090. hash_key_from_hll (&frag_id_hash, frag_id);
  1091. struct RecvCacheEntry *cache_entry
  1092. = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
  1093. if (cache_entry != NULL)
  1094. {
  1095. if (GNUNET_NO == drop)
  1096. {
  1097. client_send_mcast_msg (chn, cache_entry->mmsg, 0);
  1098. }
  1099. if (cache_entry->ref_count <= 1)
  1100. {
  1101. GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
  1102. cache_entry);
  1103. GNUNET_free (cache_entry->mmsg);
  1104. GNUNET_free (cache_entry);
  1105. }
  1106. else
  1107. {
  1108. cache_entry->ref_count--;
  1109. }
  1110. }
  1111. #if CACHE_AGING_IMPLEMENTED
  1112. else if (GNUNET_NO == drop)
  1113. {
  1114. /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
  1115. }
  1116. #endif
  1117. GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
  1118. }
  1119. if (MSG_FRAG_STATE_END <= fragq->state)
  1120. {
  1121. struct GNUNET_HashCode msg_id_hash;
  1122. hash_key_from_hll (&msg_id_hash, msg_id);
  1123. GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
  1124. GNUNET_CONTAINER_heap_destroy (fragq->fragments);
  1125. GNUNET_free (fragq);
  1126. }
  1127. else
  1128. {
  1129. fragq->is_queued = GNUNET_NO;
  1130. }
  1131. }
  1132. struct StateModifyClosure
  1133. {
  1134. struct Channel *channel;
  1135. uint64_t msg_id;
  1136. struct GNUNET_HashCode msg_id_hash;
  1137. };
  1138. void
  1139. store_recv_state_modify_result (void *cls, int64_t result,
  1140. const char *err_msg, uint16_t err_msg_size)
  1141. {
  1142. struct StateModifyClosure *mcls = cls;
  1143. struct Channel *chn = mcls->channel;
  1144. uint64_t msg_id = mcls->msg_id;
  1145. struct FragmentQueue *
  1146. fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
  1147. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1148. "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
  1149. chn, result, err_msg_size, err_msg);
  1150. switch (result)
  1151. {
  1152. case GNUNET_OK:
  1153. case GNUNET_NO:
  1154. if (NULL != fragq)
  1155. fragq->state_is_modified = GNUNET_YES;
  1156. if (chn->max_state_message_id < msg_id)
  1157. chn->max_state_message_id = msg_id;
  1158. if (chn->max_message_id < msg_id)
  1159. chn->max_message_id = msg_id;
  1160. if (NULL != fragq)
  1161. fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
  1162. GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
  1163. message_queue_run (chn);
  1164. break;
  1165. default:
  1166. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1167. "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
  1168. chn, result, err_msg_size, err_msg);
  1169. /** @todo FIXME: handle state_modify error */
  1170. }
  1171. }
  1172. /**
  1173. * Run message queue.
  1174. *
  1175. * Send messages in queue to client in order after a message has arrived from
  1176. * multicast, according to the following:
  1177. * - A message is only sent if all of its modifiers arrived.
  1178. * - A stateful message is only sent if the previous stateful message
  1179. * has already been delivered to the client.
  1180. *
  1181. * @param chn Channel.
  1182. *
  1183. * @return Number of messages removed from queue and sent to client.
  1184. */
  1185. static uint64_t
  1186. message_queue_run (struct Channel *chn)
  1187. {
  1188. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1189. "%p Running message queue.\n", chn);
  1190. uint64_t n = 0;
  1191. uint64_t msg_id;
  1192. while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
  1193. &msg_id))
  1194. {
  1195. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1196. "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
  1197. struct GNUNET_HashCode msg_id_hash;
  1198. hash_key_from_hll (&msg_id_hash, msg_id);
  1199. struct FragmentQueue *
  1200. fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
  1201. if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
  1202. {
  1203. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1204. "%p No fragq (%p) or header not complete.\n",
  1205. chn, fragq);
  1206. break;
  1207. }
  1208. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1209. "%p Fragment queue entry: state: %u, state delta: "
  1210. "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
  1211. chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
  1212. if (MSG_FRAG_STATE_DATA <= fragq->state)
  1213. {
  1214. /* Check if there's a missing message before the current one */
  1215. if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
  1216. {
  1217. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
  1218. if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
  1219. && (chn->max_message_id != msg_id - 1
  1220. && chn->max_message_id != msg_id))
  1221. {
  1222. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1223. "%p Out of order message. "
  1224. "(%" PRIu64 " != %" PRIu64 " - 1)\n",
  1225. chn, chn->max_message_id, msg_id);
  1226. break;
  1227. // FIXME: keep track of messages processed in this queue run,
  1228. // and only stop after reaching the end
  1229. }
  1230. }
  1231. else
  1232. {
  1233. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
  1234. if (GNUNET_YES != fragq->state_is_modified)
  1235. {
  1236. if (msg_id - fragq->state_delta != chn->max_state_message_id)
  1237. {
  1238. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1239. "%p Out of order stateful message. "
  1240. "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
  1241. chn, msg_id, fragq->state_delta, chn->max_state_message_id);
  1242. break;
  1243. // FIXME: keep track of messages processed in this queue run,
  1244. // and only stop after reaching the end
  1245. }
  1246. struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
  1247. mcls->channel = chn;
  1248. mcls->msg_id = msg_id;
  1249. mcls->msg_id_hash = msg_id_hash;
  1250. /* Apply modifiers to state in PSYCstore */
  1251. GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
  1252. fragq->state_delta,
  1253. store_recv_state_modify_result, mcls);
  1254. break; // continue after asynchronous state modify result
  1255. }
  1256. }
  1257. chn->max_message_id = msg_id;
  1258. }
  1259. fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
  1260. GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
  1261. n++;
  1262. }
  1263. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1264. "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
  1265. return n;
  1266. }
  1267. /**
  1268. * Drop message queue of a channel.
  1269. *
  1270. * Remove all messages in queue without sending it to clients.
  1271. *
  1272. * @param chn Channel.
  1273. *
  1274. * @return Number of messages removed from queue.
  1275. */
  1276. static uint64_t
  1277. message_queue_drop (struct Channel *chn)
  1278. {
  1279. uint64_t n = 0;
  1280. uint64_t msg_id;
  1281. while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
  1282. &msg_id))
  1283. {
  1284. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1285. "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
  1286. struct GNUNET_HashCode msg_id_hash;
  1287. hash_key_from_hll (&msg_id_hash, msg_id);
  1288. struct FragmentQueue *
  1289. fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
  1290. GNUNET_assert (NULL != fragq);
  1291. fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
  1292. GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
  1293. n++;
  1294. }
  1295. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1296. "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
  1297. return n;
  1298. }
  1299. /**
  1300. * Received result of GNUNET_PSYCSTORE_fragment_store().
  1301. */
  1302. static void
  1303. store_recv_fragment_store_result (void *cls, int64_t result,
  1304. const char *err_msg, uint16_t err_msg_size)
  1305. {
  1306. struct Channel *chn = cls;
  1307. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1308. "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
  1309. chn, result, err_msg_size, err_msg);
  1310. }
  1311. /**
  1312. * Handle incoming message fragment from multicast.
  1313. *
  1314. * Store it using PSYCstore and send it to the clients of the channel in order.
  1315. */
  1316. static void
  1317. mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
  1318. {
  1319. struct Channel *chn = cls;
  1320. uint16_t size = ntohs (mmsg->header.size);
  1321. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1322. "%p Received multicast message of size %u. "
  1323. "fragment_id=%" PRIu64 ", message_id=%" PRIu64
  1324. ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
  1325. chn, size,
  1326. GNUNET_ntohll (mmsg->fragment_id),
  1327. GNUNET_ntohll (mmsg->message_id),
  1328. GNUNET_ntohll (mmsg->fragment_offset),
  1329. GNUNET_ntohll (mmsg->flags));
  1330. GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
  1331. &store_recv_fragment_store_result, chn);
  1332. uint16_t first_ptype = 0, last_ptype = 0;
  1333. int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
  1334. (const char *) &mmsg[1],
  1335. &first_ptype, &last_ptype);
  1336. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1337. "%p Message check result %d, first part type %u, last part type %u\n",
  1338. chn, check, first_ptype, last_ptype);
  1339. if (GNUNET_SYSERR == check)
  1340. {
  1341. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1342. "%p Dropping incoming multicast message with invalid parts.\n",
  1343. chn);
  1344. GNUNET_break_op (0);
  1345. return;
  1346. }
  1347. fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
  1348. message_queue_run (chn);
  1349. }
  1350. /**
  1351. * Incoming request fragment from multicast for a master.
  1352. *
  1353. * @param cls Master.
  1354. * @param req The request.
  1355. */
  1356. static void
  1357. mcast_recv_request (void *cls,
  1358. const struct GNUNET_MULTICAST_RequestHeader *req)
  1359. {
  1360. struct Master *mst = cls;
  1361. uint16_t size = ntohs (req->header.size);
  1362. char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
  1363. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1364. "%p Received multicast request of size %u from %s.\n",
  1365. mst, size, str);
  1366. GNUNET_free (str);
  1367. uint16_t first_ptype = 0, last_ptype = 0;
  1368. if (GNUNET_SYSERR
  1369. == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
  1370. (const char *) &req[1],
  1371. &first_ptype, &last_ptype))
  1372. {
  1373. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1374. "%p Dropping incoming multicast request with invalid parts.\n",
  1375. mst);
  1376. GNUNET_break_op (0);
  1377. return;
  1378. }
  1379. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1380. "Message parts: first: type %u, last: type %u\n",
  1381. first_ptype, last_ptype);
  1382. /* FIXME: in-order delivery */
  1383. client_send_mcast_req (mst, req);
  1384. }
  1385. /**
  1386. * Response from PSYCstore with the current counter values for a channel master.
  1387. */
  1388. static void
  1389. store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
  1390. uint64_t max_message_id, uint64_t max_group_generation,
  1391. uint64_t max_state_message_id)
  1392. {
  1393. struct Master *mst = cls;
  1394. struct Channel *chn = &mst->channel;
  1395. chn->store_op = NULL;
  1396. struct GNUNET_PSYC_CountersResultMessage res;
  1397. res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
  1398. res.header.size = htons (sizeof (res));
  1399. res.result_code = htonl (result);
  1400. res.max_message_id = GNUNET_htonll (max_message_id);
  1401. if (GNUNET_OK == result || GNUNET_NO == result)
  1402. {
  1403. mst->max_message_id = max_message_id;
  1404. chn->max_message_id = max_message_id;
  1405. chn->max_state_message_id = max_state_message_id;
  1406. mst->max_group_generation = max_group_generation;
  1407. mst->origin
  1408. = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
  1409. mcast_recv_join_request,
  1410. mcast_recv_replay_fragment,
  1411. mcast_recv_replay_message,
  1412. mcast_recv_request,
  1413. mcast_recv_message, chn);
  1414. chn->is_ready = GNUNET_YES;
  1415. }
  1416. else
  1417. {
  1418. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1419. "%p GNUNET_PSYCSTORE_counters_get() "
  1420. "returned %d for channel %s.\n",
  1421. chn, result, GNUNET_h2s (&chn->pub_key_hash));
  1422. }
  1423. client_send_msg (chn, &res.header);
  1424. }
  1425. /**
  1426. * Response from PSYCstore with the current counter values for a channel slave.
  1427. */
  1428. void
  1429. store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
  1430. uint64_t max_message_id, uint64_t max_group_generation,
  1431. uint64_t max_state_message_id)
  1432. {
  1433. struct Slave *slv = cls;
  1434. struct Channel *chn = &slv->channel;
  1435. chn->store_op = NULL;
  1436. struct GNUNET_PSYC_CountersResultMessage res;
  1437. res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
  1438. res.header.size = htons (sizeof (res));
  1439. res.result_code = htonl (result);
  1440. res.max_message_id = GNUNET_htonll (max_message_id);
  1441. if (GNUNET_YES == result || GNUNET_NO == result)
  1442. {
  1443. chn->max_message_id = max_message_id;
  1444. chn->max_state_message_id = max_state_message_id;
  1445. slv->member
  1446. = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
  1447. &slv->origin,
  1448. slv->relay_count, slv->relays,
  1449. &slv->join_msg->header,
  1450. mcast_recv_join_request,
  1451. mcast_recv_join_decision,
  1452. mcast_recv_replay_fragment,
  1453. mcast_recv_replay_message,
  1454. mcast_recv_message, chn);
  1455. if (NULL != slv->join_msg)
  1456. {
  1457. GNUNET_free (slv->join_msg);
  1458. slv->join_msg = NULL;
  1459. }
  1460. }
  1461. else
  1462. {
  1463. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1464. "%p GNUNET_PSYCSTORE_counters_get() "
  1465. "returned %d for channel %s.\n",
  1466. chn, result, GNUNET_h2s (&chn->pub_key_hash));
  1467. }
  1468. client_send_msg (chn, &res.header);
  1469. }
  1470. static void
  1471. channel_init (struct Channel *chn)
  1472. {
  1473. chn->recv_msgs
  1474. = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
  1475. chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
  1476. }
  1477. /**
  1478. * Handle a connecting client starting a channel master.
  1479. */
  1480. static void
  1481. handle_client_master_start (void *cls,
  1482. const struct MasterStartRequest *req)
  1483. {
  1484. struct Client *c = cls;
  1485. struct GNUNET_SERVICE_Client *client = c->client;
  1486. struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
  1487. struct GNUNET_HashCode pub_key_hash;
  1488. GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
  1489. GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
  1490. struct Master *
  1491. mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
  1492. struct Channel *chn;
  1493. if (NULL == mst)
  1494. {
  1495. mst = GNUNET_malloc (sizeof (*mst));
  1496. mst->policy = ntohl (req->policy);
  1497. mst->priv_key = req->channel_key;
  1498. mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
  1499. chn = c->channel = &mst->channel;
  1500. chn->master = mst;
  1501. chn->is_master = GNUNET_YES;
  1502. chn->pub_key = pub_key;
  1503. chn->pub_key_hash = pub_key_hash;
  1504. channel_init (chn);
  1505. GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
  1506. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
  1507. chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
  1508. store_recv_master_counters, mst);
  1509. }
  1510. else
  1511. {
  1512. chn = &mst->channel;
  1513. struct GNUNET_PSYC_CountersResultMessage *res;
  1514. struct GNUNET_MQ_Envelope *
  1515. env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
  1516. res->result_code = htonl (GNUNET_OK);
  1517. res->max_message_id = GNUNET_htonll (mst->max_message_id);
  1518. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
  1519. }
  1520. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1521. "%p Client connected as master to channel %s.\n",
  1522. mst, GNUNET_h2s (&chn->pub_key_hash));
  1523. struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
  1524. cli->client = client;
  1525. GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
  1526. GNUNET_SERVICE_client_continue (client);
  1527. }
  1528. static int
  1529. check_client_slave_join (void *cls,
  1530. const struct SlaveJoinRequest *req)
  1531. {
  1532. return GNUNET_OK;
  1533. }
  1534. /**
  1535. * Handle a connecting client joining as a channel slave.
  1536. */
  1537. static void
  1538. handle_client_slave_join (void *cls,
  1539. const struct SlaveJoinRequest *req)
  1540. {
  1541. struct Client *c = cls;
  1542. struct GNUNET_SERVICE_Client *client = c->client;
  1543. uint16_t req_size = ntohs (req->header.size);
  1544. struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
  1545. struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
  1546. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1547. "got join request from client %p\n",
  1548. client);
  1549. GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
  1550. GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
  1551. GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
  1552. struct GNUNET_CONTAINER_MultiHashMap *
  1553. chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
  1554. struct Slave *slv = NULL;
  1555. struct Channel *chn;
  1556. if (NULL != chn_slv)
  1557. {
  1558. slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
  1559. }
  1560. if (NULL == slv)
  1561. {
  1562. slv = GNUNET_malloc (sizeof (*slv));
  1563. slv->priv_key = req->slave_key;
  1564. slv->pub_key = slv_pub_key;
  1565. slv->pub_key_hash = slv_pub_hash;
  1566. slv->origin = req->origin;
  1567. slv->relay_count = ntohl (req->relay_count);
  1568. slv->join_flags = ntohl (req->flags);
  1569. const struct GNUNET_PeerIdentity *
  1570. relays = (const struct GNUNET_PeerIdentity *) &req[1];
  1571. uint16_t relay_size = slv->relay_count * sizeof (*relays);
  1572. uint16_t join_msg_size = 0;
  1573. if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
  1574. <= req_size)
  1575. {
  1576. struct GNUNET_PSYC_Message *
  1577. join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
  1578. join_msg_size = ntohs (join_msg->header.size);
  1579. slv->join_msg = GNUNET_malloc (join_msg_size);
  1580. GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
  1581. }
  1582. if (sizeof (*req) + relay_size + join_msg_size != req_size)
  1583. {
  1584. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1585. "%u + %u + %u != %u\n",
  1586. (unsigned int) sizeof (*req),
  1587. relay_size,
  1588. join_msg_size,
  1589. req_size);
  1590. GNUNET_break (0);
  1591. GNUNET_SERVICE_client_drop (client);
  1592. GNUNET_free (slv);
  1593. return;
  1594. }
  1595. if (0 < slv->relay_count)
  1596. {
  1597. slv->relays = GNUNET_malloc (relay_size);
  1598. GNUNET_memcpy (slv->relays, &req[1], relay_size);
  1599. }
  1600. chn = c->channel = &slv->channel;
  1601. chn->slave = slv;
  1602. chn->is_master = GNUNET_NO;
  1603. chn->pub_key = req->channel_pub_key;
  1604. chn->pub_key_hash = pub_key_hash;
  1605. channel_init (chn);
  1606. if (NULL == chn_slv)
  1607. {
  1608. chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
  1609. GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
  1610. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
  1611. }
  1612. GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
  1613. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
  1614. GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
  1615. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
  1616. chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
  1617. &store_recv_slave_counters, slv);
  1618. }
  1619. else
  1620. {
  1621. chn = &slv->channel;
  1622. struct GNUNET_PSYC_CountersResultMessage *res;
  1623. struct GNUNET_MQ_Envelope *
  1624. env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
  1625. res->result_code = htonl (GNUNET_OK);
  1626. res->max_message_id = GNUNET_htonll (chn->max_message_id);
  1627. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
  1628. if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
  1629. {
  1630. mcast_recv_join_decision (slv, GNUNET_YES,
  1631. NULL, 0, NULL, NULL);
  1632. }
  1633. else if (NULL == slv->member)
  1634. {
  1635. slv->member
  1636. = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
  1637. &slv->origin,
  1638. slv->relay_count, slv->relays,
  1639. &slv->join_msg->header,
  1640. &mcast_recv_join_request,
  1641. &mcast_recv_join_decision,
  1642. &mcast_recv_replay_fragment,
  1643. &mcast_recv_replay_message,
  1644. &mcast_recv_message, chn);
  1645. if (NULL != slv->join_msg)
  1646. {
  1647. GNUNET_free (slv->join_msg);
  1648. slv->join_msg = NULL;
  1649. }
  1650. }
  1651. else if (NULL != slv->join_dcsn)
  1652. {
  1653. struct GNUNET_MQ_Envelope *
  1654. env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
  1655. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
  1656. }
  1657. }
  1658. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1659. "Client %p connected as slave to channel %s.\n",
  1660. client,
  1661. GNUNET_h2s (&chn->pub_key_hash));
  1662. struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
  1663. cli->client = client;
  1664. GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
  1665. GNUNET_SERVICE_client_continue (client);
  1666. }
  1667. struct JoinDecisionClosure
  1668. {
  1669. int32_t is_admitted;
  1670. struct GNUNET_MessageHeader *msg;
  1671. };
  1672. /**
  1673. * Iterator callback for sending join decisions to multicast.
  1674. */
  1675. static int
  1676. mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
  1677. void *value)
  1678. {
  1679. struct JoinDecisionClosure *jcls = cls;
  1680. struct GNUNET_MULTICAST_JoinHandle *jh = value;
  1681. // FIXME: add relays
  1682. GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
  1683. return GNUNET_YES;
  1684. }
  1685. static int
  1686. check_client_join_decision (void *cls,
  1687. const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
  1688. {
  1689. return GNUNET_OK;
  1690. }
  1691. /**
  1692. * Join decision from client.
  1693. */
  1694. static void
  1695. handle_client_join_decision (void *cls,
  1696. const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
  1697. {
  1698. struct Client *c = cls;
  1699. struct GNUNET_SERVICE_Client *client = c->client;
  1700. struct Channel *chn = c->channel;
  1701. if (NULL == chn)
  1702. {
  1703. GNUNET_break (0);
  1704. GNUNET_SERVICE_client_drop (client);
  1705. return;
  1706. }
  1707. GNUNET_assert (GNUNET_YES == chn->is_master);
  1708. struct Master *mst = chn->master;
  1709. struct JoinDecisionClosure jcls;
  1710. jcls.is_admitted = ntohl (dcsn->is_admitted);
  1711. jcls.msg
  1712. = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
  1713. ? (struct GNUNET_MessageHeader *) &dcsn[1]
  1714. : NULL;
  1715. struct GNUNET_HashCode slave_pub_hash;
  1716. GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
  1717. &slave_pub_hash);
  1718. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1719. "%p Got join decision (%d) from client for channel %s..\n",
  1720. mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
  1721. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1722. "%p ..and slave %s.\n",
  1723. mst, GNUNET_h2s (&slave_pub_hash));
  1724. GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
  1725. &mcast_send_join_decision, &jcls);
  1726. GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
  1727. GNUNET_SERVICE_client_continue (client);
  1728. }
  1729. static void
  1730. channel_part_cb (void *cls)
  1731. {
  1732. struct GNUNET_SERVICE_Client *client = cls;
  1733. struct GNUNET_MQ_Envelope *env;
  1734. env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK);
  1735. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
  1736. env);
  1737. }
  1738. static void
  1739. handle_client_part_request (void *cls,
  1740. const struct GNUNET_MessageHeader *msg)
  1741. {
  1742. struct Client *c = cls;
  1743. c->channel->is_disconnecting = GNUNET_YES;
  1744. if (GNUNET_YES == c->channel->is_master)
  1745. {
  1746. struct Master *mst = (struct Master *) c->channel;
  1747. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1748. "Got part request from master %p\n",
  1749. mst);
  1750. GNUNET_assert (NULL != mst->origin);
  1751. GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client);
  1752. }
  1753. else
  1754. {
  1755. struct Slave *slv = (struct Slave *) c->channel;
  1756. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1757. "Got part request from slave %p\n",
  1758. slv);
  1759. GNUNET_assert (NULL != slv->member);
  1760. GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client);
  1761. }
  1762. GNUNET_SERVICE_client_continue (c->client);
  1763. }
  1764. /**
  1765. * Send acknowledgement to a client.
  1766. *
  1767. * Sent after a message fragment has been passed on to multicast.
  1768. *
  1769. * @param chn The channel struct for the client.
  1770. */
  1771. static void
  1772. send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
  1773. {
  1774. struct GNUNET_MessageHeader *res;
  1775. struct GNUNET_MQ_Envelope *
  1776. env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
  1777. /* FIXME? */
  1778. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
  1779. }
  1780. /**
  1781. * Callback for the transmit functions of multicast.
  1782. */
  1783. static int
  1784. transmit_notify (void *cls, size_t *data_size, void *data)
  1785. {
  1786. struct Channel *chn = cls;
  1787. struct TransmitMessage *tmit_msg = chn->tmit_head;
  1788. if (NULL == tmit_msg || *data_size < tmit_msg->size)
  1789. {
  1790. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1791. "%p transmit_notify: nothing to send.\n", chn);
  1792. if (NULL != tmit_msg && *data_size < tmit_msg->size)
  1793. GNUNET_break (0);
  1794. *data_size = 0;
  1795. return GNUNET_NO;
  1796. }
  1797. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1798. "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
  1799. *data_size = tmit_msg->size;
  1800. GNUNET_memcpy (data, &tmit_msg[1], *data_size);
  1801. int ret
  1802. = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
  1803. ? GNUNET_NO
  1804. : GNUNET_YES;
  1805. /* FIXME: handle disconnecting clients */
  1806. if (NULL != tmit_msg->client)
  1807. send_message_ack (chn, tmit_msg->client);
  1808. GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
  1809. if (NULL != chn->tmit_head)
  1810. {
  1811. GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
  1812. }
  1813. else if (GNUNET_YES == chn->is_disconnecting
  1814. && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
  1815. {
  1816. /* FIXME: handle partial message (when still in_transmit) */
  1817. GNUNET_free (tmit_msg);
  1818. return GNUNET_SYSERR;
  1819. }
  1820. GNUNET_free (tmit_msg);
  1821. return ret;
  1822. }
  1823. /**
  1824. * Callback for the transmit functions of multicast.
  1825. */
  1826. static int
  1827. master_transmit_notify (void *cls, size_t *data_size, void *data)
  1828. {
  1829. int ret = transmit_notify (cls, data_size, data);
  1830. if (GNUNET_YES == ret)
  1831. {
  1832. struct Master *mst = cls;
  1833. mst->tmit_handle = NULL;
  1834. }
  1835. return ret;
  1836. }
  1837. /**
  1838. * Callback for the transmit functions of multicast.
  1839. */
  1840. static int
  1841. slave_transmit_notify (void *cls, size_t *data_size, void *data)
  1842. {
  1843. int ret = transmit_notify (cls, data_size, data);
  1844. if (GNUNET_YES == ret)
  1845. {
  1846. struct Slave *slv = cls;
  1847. slv->tmit_handle = NULL;
  1848. }
  1849. return ret;
  1850. }
  1851. /**
  1852. * Transmit a message from a channel master to the multicast group.
  1853. */
  1854. static void
  1855. master_transmit_message (struct Master *mst)
  1856. {
  1857. struct Channel *chn = &mst->channel;
  1858. struct TransmitMessage *tmit_msg = chn->tmit_head;
  1859. if (NULL == tmit_msg)
  1860. return;
  1861. if (NULL == mst->tmit_handle)
  1862. {
  1863. mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin,
  1864. tmit_msg->id,
  1865. mst->max_group_generation,
  1866. &master_transmit_notify,
  1867. mst);
  1868. }
  1869. else
  1870. {
  1871. GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
  1872. }
  1873. }
  1874. /**
  1875. * Transmit a message from a channel slave to the multicast group.
  1876. */
  1877. static void
  1878. slave_transmit_message (struct Slave *slv)
  1879. {
  1880. if (NULL == slv->channel.tmit_head)
  1881. return;
  1882. if (NULL == slv->tmit_handle)
  1883. {
  1884. slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member,
  1885. slv->channel.tmit_head->id,
  1886. &slave_transmit_notify,
  1887. slv);
  1888. }
  1889. else
  1890. {
  1891. GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
  1892. }
  1893. }
  1894. static void
  1895. transmit_message (struct Channel *chn)
  1896. {
  1897. chn->is_master
  1898. ? master_transmit_message (chn->master)
  1899. : slave_transmit_message (chn->slave);
  1900. }
  1901. /**
  1902. * Queue a message from a channel master for sending to the multicast group.
  1903. */
  1904. static void
  1905. master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
  1906. {
  1907. if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
  1908. {
  1909. tmit_msg->id = ++mst->max_message_id;
  1910. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1911. "%p master_queue_message: message_id=%" PRIu64 "\n",
  1912. mst, tmit_msg->id);
  1913. struct GNUNET_PSYC_MessageMethod *pmeth
  1914. = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
  1915. if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
  1916. {
  1917. pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
  1918. }
  1919. else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
  1920. {
  1921. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1922. "%p master_queue_message: state_delta=%" PRIu64 "\n",
  1923. mst, tmit_msg->id - mst->max_state_message_id);
  1924. pmeth->state_delta = GNUNET_htonll (tmit_msg->id
  1925. - mst->max_state_message_id);
  1926. mst->max_state_message_id = tmit_msg->id;
  1927. }
  1928. else
  1929. {
  1930. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1931. "%p master_queue_message: state not modified\n", mst);
  1932. pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
  1933. }
  1934. if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
  1935. {
  1936. /// @todo add state_hash to PSYC header
  1937. }
  1938. }
  1939. }
  1940. /**
  1941. * Queue a message from a channel slave for sending to the multicast group.
  1942. */
  1943. static void
  1944. slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
  1945. {
  1946. if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
  1947. {
  1948. struct GNUNET_PSYC_MessageMethod *pmeth
  1949. = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
  1950. pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
  1951. tmit_msg->id = ++slv->max_request_id;
  1952. }
  1953. }
  1954. /**
  1955. * Queue PSYC message parts for sending to multicast.
  1956. *
  1957. * @param chn
  1958. * Channel to send to.
  1959. * @param client
  1960. * Client the message originates from.
  1961. * @param data_size
  1962. * Size of @a data.
  1963. * @param data
  1964. * Concatenated message parts.
  1965. * @param first_ptype
  1966. * First message part type in @a data.
  1967. * @param last_ptype
  1968. * Last message part type in @a data.
  1969. */
  1970. static struct TransmitMessage *
  1971. queue_message (struct Channel *chn,
  1972. struct GNUNET_SERVICE_Client *client,
  1973. size_t data_size,
  1974. const void *data,
  1975. uint16_t first_ptype, uint16_t last_ptype)
  1976. {
  1977. struct TransmitMessage *
  1978. tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
  1979. GNUNET_memcpy (&tmit_msg[1], data, data_size);
  1980. tmit_msg->client = client;
  1981. tmit_msg->size = data_size;
  1982. tmit_msg->first_ptype = first_ptype;
  1983. tmit_msg->last_ptype = last_ptype;
  1984. /* FIXME: separate queue per message ID */
  1985. GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
  1986. chn->is_master
  1987. ? master_queue_message (chn->master, tmit_msg)
  1988. : slave_queue_message (chn->slave, tmit_msg);
  1989. return tmit_msg;
  1990. }
  1991. /**
  1992. * Cancel transmission of current message.
  1993. *
  1994. * @param chn Channel to send to.
  1995. * @param client Client the message originates from.
  1996. */
  1997. static void
  1998. transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
  1999. {
  2000. uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
  2001. struct GNUNET_MessageHeader msg;
  2002. msg.size = htons (sizeof (msg));
  2003. msg.type = htons (type);
  2004. queue_message (chn, client, sizeof (msg), &msg, type, type);
  2005. transmit_message (chn);
  2006. /* FIXME: cleanup */
  2007. }
  2008. static int
  2009. check_client_psyc_message (void *cls,
  2010. const struct GNUNET_MessageHeader *msg)
  2011. {
  2012. return GNUNET_OK;
  2013. }
  2014. /**
  2015. * Incoming message from a master or slave client.
  2016. */
  2017. static void
  2018. handle_client_psyc_message (void *cls,
  2019. const struct GNUNET_MessageHeader *msg)
  2020. {
  2021. struct Client *c = cls;
  2022. struct GNUNET_SERVICE_Client *client = c->client;
  2023. struct Channel *chn = c->channel;
  2024. if (NULL == chn)
  2025. {
  2026. GNUNET_break (0);
  2027. GNUNET_SERVICE_client_drop (client);
  2028. return;
  2029. }
  2030. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2031. "%p Received message from client.\n", chn);
  2032. GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
  2033. if (GNUNET_YES != chn->is_ready)
  2034. {
  2035. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  2036. "%p Channel is not ready yet, disconnecting client %p.\n",
  2037. chn,
  2038. client);
  2039. GNUNET_break (0);
  2040. GNUNET_SERVICE_client_drop (client);
  2041. return;
  2042. }
  2043. uint16_t size = ntohs (msg->size);
  2044. if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
  2045. {
  2046. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  2047. "%p Message payload too large: %u < %u.\n",
  2048. chn,
  2049. (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
  2050. (unsigned int) (size - sizeof (*msg)));
  2051. GNUNET_break (0);
  2052. transmit_cancel (chn, client);
  2053. GNUNET_SERVICE_client_drop (client);
  2054. return;
  2055. }
  2056. uint16_t first_ptype = 0, last_ptype = 0;
  2057. if (GNUNET_SYSERR
  2058. == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
  2059. (const char *) &msg[1],
  2060. &first_ptype, &last_ptype))
  2061. {
  2062. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  2063. "%p Received invalid message part from client.\n", chn);
  2064. GNUNET_break (0);
  2065. transmit_cancel (chn, client);
  2066. GNUNET_SERVICE_client_drop (client);
  2067. return;
  2068. }
  2069. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2070. "%p Received message with first part type %u and last part type %u.\n",
  2071. chn, first_ptype, last_ptype);
  2072. queue_message (chn, client, size - sizeof (*msg), &msg[1],
  2073. first_ptype, last_ptype);
  2074. transmit_message (chn);
  2075. /* FIXME: send a few ACKs even before transmit_notify is called */
  2076. GNUNET_SERVICE_client_continue (client);
  2077. };
  2078. /**
  2079. * Received result of GNUNET_PSYCSTORE_membership_store()
  2080. */
  2081. static void
  2082. store_recv_membership_store_result (void *cls,
  2083. int64_t result,
  2084. const char *err_msg,
  2085. uint16_t err_msg_size)
  2086. {
  2087. struct Operation *op = cls;
  2088. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2089. "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
  2090. op->channel,
  2091. result,
  2092. (int) err_msg_size,
  2093. err_msg);
  2094. if (NULL != op->client)
  2095. client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
  2096. op_remove (op);
  2097. }
  2098. /**
  2099. * Client requests to add/remove a slave in the membership database.
  2100. */
  2101. static void
  2102. handle_client_membership_store (void *cls,
  2103. const struct ChannelMembershipStoreRequest *req)
  2104. {
  2105. struct Client *c = cls;
  2106. struct GNUNET_SERVICE_Client *client = c->client;
  2107. struct Channel *chn = c->channel;
  2108. if (NULL == chn)
  2109. {
  2110. GNUNET_break (0);
  2111. GNUNET_SERVICE_client_drop (client);
  2112. return;
  2113. }
  2114. struct Operation *op = op_add (chn, client, req->op_id, 0);
  2115. uint64_t announced_at = GNUNET_ntohll (req->announced_at);
  2116. uint64_t effective_since = GNUNET_ntohll (req->effective_since);
  2117. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2118. "%p Received membership store request from client.\n", chn);
  2119. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2120. "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
  2121. chn, req->did_join, announced_at, effective_since);
  2122. GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
  2123. req->did_join, announced_at, effective_since,
  2124. 0, /* FIXME: group_generation */
  2125. &store_recv_membership_store_result, op);
  2126. GNUNET_SERVICE_client_continue (client);
  2127. }
  2128. /**
  2129. * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
  2130. * in response to a history request from a client.
  2131. */
  2132. static int
  2133. store_recv_fragment_history (void *cls,
  2134. struct GNUNET_MULTICAST_MessageHeader *mmsg,
  2135. enum GNUNET_PSYCSTORE_MessageFlags flags)
  2136. {
  2137. struct Operation *op = cls;
  2138. if (NULL == op->client)
  2139. { /* Requesting client already disconnected. */
  2140. return GNUNET_NO;
  2141. }
  2142. struct Channel *chn = op->channel;
  2143. struct GNUNET_PSYC_MessageHeader *pmsg;
  2144. uint16_t msize = ntohs (mmsg->header.size);
  2145. uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
  2146. struct GNUNET_OperationResultMessage *
  2147. res = GNUNET_malloc (sizeof (*res) + psize);
  2148. res->header.size = htons (sizeof (*res) + psize);
  2149. res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
  2150. res->op_id = op->op_id;
  2151. res->result_code = GNUNET_htonll (GNUNET_OK);
  2152. pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
  2153. GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
  2154. GNUNET_memcpy (&res[1], pmsg, psize);
  2155. /** @todo FIXME: send only to requesting client */
  2156. client_send_msg (chn, &res->header);
  2157. GNUNET_free (res);
  2158. return GNUNET_YES;
  2159. }
  2160. /**
  2161. * Received the result of GNUNET_PSYCSTORE_fragment_get(),
  2162. * in response to a history request from a client.
  2163. */
  2164. static void
  2165. store_recv_fragment_history_result (void *cls, int64_t result,
  2166. const char *err_msg, uint16_t err_msg_size)
  2167. {
  2168. struct Operation *op = cls;
  2169. if (NULL == op->client)
  2170. { /* Requesting client already disconnected. */
  2171. return;
  2172. }
  2173. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2174. "%p History replay #%" PRIu64 ": "
  2175. "PSYCSTORE returned %" PRId64 " (%.*s)\n",
  2176. op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
  2177. if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
  2178. {
  2179. /** @todo Multicast replay request for messages not found locally. */
  2180. }
  2181. client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
  2182. op_remove (op);
  2183. }
  2184. static int
  2185. check_client_history_replay (void *cls,
  2186. const struct GNUNET_PSYC_HistoryRequestMessage *req)
  2187. {
  2188. return GNUNET_OK;
  2189. }
  2190. /**
  2191. * Client requests channel history.
  2192. */
  2193. static void
  2194. handle_client_history_replay (void *cls,
  2195. const struct GNUNET_PSYC_HistoryRequestMessage *req)
  2196. {
  2197. struct Client *c = cls;
  2198. struct GNUNET_SERVICE_Client *client = c->client;
  2199. struct Channel *chn = c->channel;
  2200. if (NULL == chn)
  2201. {
  2202. GNUNET_break (0);
  2203. GNUNET_SERVICE_client_drop (client);
  2204. return;
  2205. }
  2206. uint16_t size = ntohs (req->header.size);
  2207. const char *method_prefix = (const char *) &req[1];
  2208. if (size < sizeof (*req) + 1
  2209. || '\0' != method_prefix[size - sizeof (*req) - 1])
  2210. {
  2211. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  2212. "%p History replay #%" PRIu64 ": "
  2213. "invalid method prefix. size: %u < %u?\n",
  2214. chn,
  2215. GNUNET_ntohll (req->op_id),
  2216. size,
  2217. (unsigned int) sizeof (*req) + 1);
  2218. GNUNET_break (0);
  2219. GNUNET_SERVICE_client_drop (client);
  2220. return;
  2221. }
  2222. struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
  2223. if (0 == req->message_limit)
  2224. {
  2225. GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
  2226. GNUNET_ntohll (req->start_message_id),
  2227. GNUNET_ntohll (req->end_message_id),
  2228. 0, method_prefix,
  2229. &store_recv_fragment_history,
  2230. &store_recv_fragment_history_result, op);
  2231. }
  2232. else
  2233. {
  2234. GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
  2235. GNUNET_ntohll (req->message_limit),
  2236. method_prefix,
  2237. &store_recv_fragment_history,
  2238. &store_recv_fragment_history_result,
  2239. op);
  2240. }
  2241. GNUNET_SERVICE_client_continue (client);
  2242. }
  2243. /**
  2244. * Received state var from PSYCstore, send it to client.
  2245. */
  2246. static int
  2247. store_recv_state_var (void *cls, const char *name,
  2248. const void *value, uint32_t value_size)
  2249. {
  2250. struct Operation *op = cls;
  2251. struct GNUNET_OperationResultMessage *res;
  2252. struct GNUNET_MQ_Envelope *env;
  2253. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2254. "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
  2255. op->channel, GNUNET_ntohll (op->op_id), name);
  2256. if (NULL != name) /* First part */
  2257. {
  2258. uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
  2259. struct GNUNET_PSYC_MessageModifier *mod;
  2260. env = GNUNET_MQ_msg_extra (res,
  2261. sizeof (*mod) + name_size + value_size,
  2262. GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
  2263. res->op_id = op->op_id;
  2264. mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
  2265. mod->header.size = htons (sizeof (*mod) + name_size + value_size);
  2266. mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
  2267. mod->name_size = htons (name_size);
  2268. mod->value_size = htonl (value_size);
  2269. mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
  2270. GNUNET_memcpy (&mod[1], name, name_size);
  2271. GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
  2272. }
  2273. else /* Continuation */
  2274. {
  2275. struct GNUNET_MessageHeader *mod;
  2276. env = GNUNET_MQ_msg_extra (res,
  2277. sizeof (*mod) + value_size,
  2278. GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
  2279. res->op_id = op->op_id;
  2280. mod = (struct GNUNET_MessageHeader *) &res[1];
  2281. mod->size = htons (sizeof (*mod) + value_size);
  2282. mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
  2283. GNUNET_memcpy (&mod[1], value, value_size);
  2284. }
  2285. // FIXME: client might have been disconnected
  2286. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
  2287. return GNUNET_YES;
  2288. }
  2289. /**
  2290. * Received result of GNUNET_PSYCSTORE_state_get()
  2291. * or GNUNET_PSYCSTORE_state_get_prefix()
  2292. */
  2293. static void
  2294. store_recv_state_result (void *cls, int64_t result,
  2295. const char *err_msg, uint16_t err_msg_size)
  2296. {
  2297. struct Operation *op = cls;
  2298. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2299. "%p state_get #%" PRIu64 ": "
  2300. "PSYCSTORE returned %" PRId64 " (%.*s)\n",
  2301. op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
  2302. // FIXME: client might have been disconnected
  2303. client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
  2304. op_remove (op);
  2305. }
  2306. static int
  2307. check_client_state_get (void *cls,
  2308. const struct StateRequest *req)
  2309. {
  2310. struct Client *c = cls;
  2311. struct Channel *chn = c->channel;
  2312. if (NULL == chn)
  2313. {
  2314. GNUNET_break (0);
  2315. return GNUNET_SYSERR;
  2316. }
  2317. uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
  2318. const char *name = (const char *) &req[1];
  2319. if (0 == name_size || '\0' != name[name_size - 1])
  2320. {
  2321. GNUNET_break (0);
  2322. return GNUNET_SYSERR;
  2323. }
  2324. return GNUNET_OK;
  2325. }
  2326. /**
  2327. * Client requests best matching state variable from PSYCstore.
  2328. */
  2329. static void
  2330. handle_client_state_get (void *cls,
  2331. const struct StateRequest *req)
  2332. {
  2333. struct Client *c = cls;
  2334. struct GNUNET_SERVICE_Client *client = c->client;
  2335. struct Channel *chn = c->channel;
  2336. const char *name = (const char *) &req[1];
  2337. struct Operation *op = op_add (chn, client, req->op_id, 0);
  2338. GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
  2339. &store_recv_state_var,
  2340. &store_recv_state_result, op);
  2341. GNUNET_SERVICE_client_continue (client);
  2342. }
  2343. static int
  2344. check_client_state_get_prefix (void *cls,
  2345. const struct StateRequest *req)
  2346. {
  2347. struct Client *c = cls;
  2348. struct Channel *chn = c->channel;
  2349. if (NULL == chn)
  2350. {
  2351. GNUNET_break (0);
  2352. return GNUNET_SYSERR;
  2353. }
  2354. uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
  2355. const char *name = (const char *) &req[1];
  2356. if (0 == name_size || '\0' != name[name_size - 1])
  2357. {
  2358. GNUNET_break (0);
  2359. return GNUNET_SYSERR;
  2360. }
  2361. return GNUNET_OK;
  2362. }
  2363. /**
  2364. * Client requests state variables with a given prefix from PSYCstore.
  2365. */
  2366. static void
  2367. handle_client_state_get_prefix (void *cls,
  2368. const struct StateRequest *req)
  2369. {
  2370. struct Client *c = cls;
  2371. struct GNUNET_SERVICE_Client *client = c->client;
  2372. struct Channel *chn = c->channel;
  2373. const char *name = (const char *) &req[1];
  2374. struct Operation *op = op_add (chn, client, req->op_id, 0);
  2375. GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
  2376. &store_recv_state_var,
  2377. &store_recv_state_result, op);
  2378. GNUNET_SERVICE_client_continue (client);
  2379. }
  2380. /**
  2381. * Initialize the PSYC service.
  2382. *
  2383. * @param cls Closure.
  2384. * @param server The initialized server.
  2385. * @param c Configuration to use.
  2386. */
  2387. static void
  2388. run (void *cls,
  2389. const struct GNUNET_CONFIGURATION_Handle *c,
  2390. struct GNUNET_SERVICE_Handle *svc)
  2391. {
  2392. cfg = c;
  2393. service = svc;
  2394. store = GNUNET_PSYCSTORE_connect (cfg);
  2395. stats = GNUNET_STATISTICS_create ("psyc", cfg);
  2396. masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
  2397. slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
  2398. channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
  2399. recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
  2400. GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
  2401. }
  2402. /**
  2403. * Define "main" method using service macro.
  2404. */
  2405. GNUNET_SERVICE_MAIN
  2406. ("psyc",
  2407. GNUNET_SERVICE_OPTION_NONE,
  2408. &run,
  2409. &client_notify_connect,
  2410. &client_notify_disconnect,
  2411. NULL,
  2412. GNUNET_MQ_hd_fixed_size (client_master_start,
  2413. GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
  2414. struct MasterStartRequest,
  2415. NULL),
  2416. GNUNET_MQ_hd_var_size (client_slave_join,
  2417. GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
  2418. struct SlaveJoinRequest,
  2419. NULL),
  2420. GNUNET_MQ_hd_var_size (client_join_decision,
  2421. GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
  2422. struct GNUNET_PSYC_JoinDecisionMessage,
  2423. NULL),
  2424. GNUNET_MQ_hd_fixed_size (client_part_request,
  2425. GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST,
  2426. struct GNUNET_MessageHeader,
  2427. NULL),
  2428. GNUNET_MQ_hd_var_size (client_psyc_message,
  2429. GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
  2430. struct GNUNET_MessageHeader,
  2431. NULL),
  2432. GNUNET_MQ_hd_fixed_size (client_membership_store,
  2433. GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
  2434. struct ChannelMembershipStoreRequest,
  2435. NULL),
  2436. GNUNET_MQ_hd_var_size (client_history_replay,
  2437. GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
  2438. struct GNUNET_PSYC_HistoryRequestMessage,
  2439. NULL),
  2440. GNUNET_MQ_hd_var_size (client_state_get,
  2441. GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
  2442. struct StateRequest,
  2443. NULL),
  2444. GNUNET_MQ_hd_var_size (client_state_get_prefix,
  2445. GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
  2446. struct StateRequest,
  2447. NULL));
  2448. /* end of gnunet-service-psyc.c */