LCOV - code coverage report
Current view: top level - kernel/printk - printk_ringbuffer.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 232 363 63.9 %
Date: 2023-08-24 13:40:31 Functions: 20 27 74.1 %

          Line data    Source code
       1             : // SPDX-License-Identifier: GPL-2.0
       2             : 
       3             : #include <linux/kernel.h>
       4             : #include <linux/irqflags.h>
       5             : #include <linux/string.h>
       6             : #include <linux/errno.h>
       7             : #include <linux/bug.h>
       8             : #include "printk_ringbuffer.h"
       9             : 
      10             : /**
      11             :  * DOC: printk_ringbuffer overview
      12             :  *
      13             :  * Data Structure
      14             :  * --------------
      15             :  * The printk_ringbuffer is made up of 3 internal ringbuffers:
      16             :  *
      17             :  *   desc_ring
      18             :  *     A ring of descriptors and their meta data (such as sequence number,
      19             :  *     timestamp, loglevel, etc.) as well as internal state information about
      20             :  *     the record and logical positions specifying where in the other
      21             :  *     ringbuffer the text strings are located.
      22             :  *
      23             :  *   text_data_ring
      24             :  *     A ring of data blocks. A data block consists of an unsigned long
      25             :  *     integer (ID) that maps to a desc_ring index followed by the text
      26             :  *     string of the record.
      27             :  *
      28             :  * The internal state information of a descriptor is the key element to allow
      29             :  * readers and writers to locklessly synchronize access to the data.
      30             :  *
      31             :  * Implementation
      32             :  * --------------
      33             :  *
      34             :  * Descriptor Ring
      35             :  * ~~~~~~~~~~~~~~~
      36             :  * The descriptor ring is an array of descriptors. A descriptor contains
      37             :  * essential meta data to track the data of a printk record using
      38             :  * blk_lpos structs pointing to associated text data blocks (see
      39             :  * "Data Rings" below). Each descriptor is assigned an ID that maps
      40             :  * directly to index values of the descriptor array and has a state. The ID
      41             :  * and the state are bitwise combined into a single descriptor field named
      42             :  * @state_var, allowing ID and state to be synchronously and atomically
      43             :  * updated.
      44             :  *
      45             :  * Descriptors have four states:
      46             :  *
      47             :  *   reserved
      48             :  *     A writer is modifying the record.
      49             :  *
      50             :  *   committed
      51             :  *     The record and all its data are written. A writer can reopen the
      52             :  *     descriptor (transitioning it back to reserved), but in the committed
      53             :  *     state the data is consistent.
      54             :  *
      55             :  *   finalized
      56             :  *     The record and all its data are complete and available for reading. A
      57             :  *     writer cannot reopen the descriptor.
      58             :  *
      59             :  *   reusable
      60             :  *     The record exists, but its text and/or meta data may no longer be
      61             :  *     available.
      62             :  *
      63             :  * Querying the @state_var of a record requires providing the ID of the
      64             :  * descriptor to query. This can yield a possible fifth (pseudo) state:
      65             :  *
      66             :  *   miss
      67             :  *     The descriptor being queried has an unexpected ID.
      68             :  *
      69             :  * The descriptor ring has a @tail_id that contains the ID of the oldest
      70             :  * descriptor and @head_id that contains the ID of the newest descriptor.
      71             :  *
      72             :  * When a new descriptor should be created (and the ring is full), the tail
      73             :  * descriptor is invalidated by first transitioning to the reusable state and
      74             :  * then invalidating all tail data blocks up to and including the data blocks
      75             :  * associated with the tail descriptor (for the text ring). Then
      76             :  * @tail_id is advanced, followed by advancing @head_id. And finally the
      77             :  * @state_var of the new descriptor is initialized to the new ID and reserved
      78             :  * state.
      79             :  *
      80             :  * The @tail_id can only be advanced if the new @tail_id would be in the
      81             :  * committed or reusable queried state. This makes it possible that a valid
      82             :  * sequence number of the tail is always available.
      83             :  *
      84             :  * Descriptor Finalization
      85             :  * ~~~~~~~~~~~~~~~~~~~~~~~
      86             :  * When a writer calls the commit function prb_commit(), record data is
      87             :  * fully stored and is consistent within the ringbuffer. However, a writer can
      88             :  * reopen that record, claiming exclusive access (as with prb_reserve()), and
      89             :  * modify that record. When finished, the writer must again commit the record.
      90             :  *
      91             :  * In order for a record to be made available to readers (and also become
      92             :  * recyclable for writers), it must be finalized. A finalized record cannot be
      93             :  * reopened and can never become "unfinalized". Record finalization can occur
      94             :  * in three different scenarios:
      95             :  *
      96             :  *   1) A writer can simultaneously commit and finalize its record by calling
      97             :  *      prb_final_commit() instead of prb_commit().
      98             :  *
      99             :  *   2) When a new record is reserved and the previous record has been
     100             :  *      committed via prb_commit(), that previous record is automatically
     101             :  *      finalized.
     102             :  *
     103             :  *   3) When a record is committed via prb_commit() and a newer record
     104             :  *      already exists, the record being committed is automatically finalized.
     105             :  *
     106             :  * Data Ring
     107             :  * ~~~~~~~~~
     108             :  * The text data ring is a byte array composed of data blocks. Data blocks are
     109             :  * referenced by blk_lpos structs that point to the logical position of the
     110             :  * beginning of a data block and the beginning of the next adjacent data
     111             :  * block. Logical positions are mapped directly to index values of the byte
     112             :  * array ringbuffer.
     113             :  *
     114             :  * Each data block consists of an ID followed by the writer data. The ID is
     115             :  * the identifier of a descriptor that is associated with the data block. A
     116             :  * given data block is considered valid if all of the following conditions
     117             :  * are met:
     118             :  *
     119             :  *   1) The descriptor associated with the data block is in the committed
     120             :  *      or finalized queried state.
     121             :  *
     122             :  *   2) The blk_lpos struct within the descriptor associated with the data
     123             :  *      block references back to the same data block.
     124             :  *
     125             :  *   3) The data block is within the head/tail logical position range.
     126             :  *
     127             :  * If the writer data of a data block would extend beyond the end of the
     128             :  * byte array, only the ID of the data block is stored at the logical
     129             :  * position and the full data block (ID and writer data) is stored at the
     130             :  * beginning of the byte array. The referencing blk_lpos will point to the
     131             :  * ID before the wrap and the next data block will be at the logical
     132             :  * position adjacent the full data block after the wrap.
     133             :  *
     134             :  * Data rings have a @tail_lpos that points to the beginning of the oldest
     135             :  * data block and a @head_lpos that points to the logical position of the
     136             :  * next (not yet existing) data block.
     137             :  *
     138             :  * When a new data block should be created (and the ring is full), tail data
     139             :  * blocks will first be invalidated by putting their associated descriptors
     140             :  * into the reusable state and then pushing the @tail_lpos forward beyond
     141             :  * them. Then the @head_lpos is pushed forward and is associated with a new
     142             :  * descriptor. If a data block is not valid, the @tail_lpos cannot be
     143             :  * advanced beyond it.
     144             :  *
     145             :  * Info Array
     146             :  * ~~~~~~~~~~
     147             :  * The general meta data of printk records are stored in printk_info structs,
     148             :  * stored in an array with the same number of elements as the descriptor ring.
     149             :  * Each info corresponds to the descriptor of the same index in the
     150             :  * descriptor ring. Info validity is confirmed by evaluating the corresponding
     151             :  * descriptor before and after loading the info.
     152             :  *
     153             :  * Usage
     154             :  * -----
     155             :  * Here are some simple examples demonstrating writers and readers. For the
     156             :  * examples a global ringbuffer (test_rb) is available (which is not the
     157             :  * actual ringbuffer used by printk)::
     158             :  *
     159             :  *      DEFINE_PRINTKRB(test_rb, 15, 5);
     160             :  *
     161             :  * This ringbuffer allows up to 32768 records (2 ^ 15) and has a size of
     162             :  * 1 MiB (2 ^ (15 + 5)) for text data.
     163             :  *
     164             :  * Sample writer code::
     165             :  *
     166             :  *      const char *textstr = "message text";
     167             :  *      struct prb_reserved_entry e;
     168             :  *      struct printk_record r;
     169             :  *
     170             :  *      // specify how much to allocate
     171             :  *      prb_rec_init_wr(&r, strlen(textstr) + 1);
     172             :  *
     173             :  *      if (prb_reserve(&e, &test_rb, &r)) {
     174             :  *              snprintf(r.text_buf, r.text_buf_size, "%s", textstr);
     175             :  *
     176             :  *              r.info->text_len = strlen(textstr);
     177             :  *              r.info->ts_nsec = local_clock();
     178             :  *              r.info->caller_id = printk_caller_id();
     179             :  *
     180             :  *              // commit and finalize the record
     181             :  *              prb_final_commit(&e);
     182             :  *      }
     183             :  *
     184             :  * Note that additional writer functions are available to extend a record
     185             :  * after it has been committed but not yet finalized. This can be done as
     186             :  * long as no new records have been reserved and the caller is the same.
     187             :  *
     188             :  * Sample writer code (record extending)::
     189             :  *
     190             :  *              // alternate rest of previous example
     191             :  *
     192             :  *              r.info->text_len = strlen(textstr);
     193             :  *              r.info->ts_nsec = local_clock();
     194             :  *              r.info->caller_id = printk_caller_id();
     195             :  *
     196             :  *              // commit the record (but do not finalize yet)
     197             :  *              prb_commit(&e);
     198             :  *      }
     199             :  *
     200             :  *      ...
     201             :  *
     202             :  *      // specify additional 5 bytes text space to extend
     203             :  *      prb_rec_init_wr(&r, 5);
     204             :  *
     205             :  *      // try to extend, but only if it does not exceed 32 bytes
     206             :  *      if (prb_reserve_in_last(&e, &test_rb, &r, printk_caller_id(), 32)) {
     207             :  *              snprintf(&r.text_buf[r.info->text_len],
     208             :  *                       r.text_buf_size - r.info->text_len, "hello");
     209             :  *
     210             :  *              r.info->text_len += 5;
     211             :  *
     212             :  *              // commit and finalize the record
     213             :  *              prb_final_commit(&e);
     214             :  *      }
     215             :  *
     216             :  * Sample reader code::
     217             :  *
     218             :  *      struct printk_info info;
     219             :  *      struct printk_record r;
     220             :  *      char text_buf[32];
     221             :  *      u64 seq;
     222             :  *
     223             :  *      prb_rec_init_rd(&r, &info, &text_buf[0], sizeof(text_buf));
     224             :  *
     225             :  *      prb_for_each_record(0, &test_rb, &seq, &r) {
     226             :  *              if (info.seq != seq)
     227             :  *                      pr_warn("lost %llu records\n", info.seq - seq);
     228             :  *
     229             :  *              if (info.text_len > r.text_buf_size) {
     230             :  *                      pr_warn("record %llu text truncated\n", info.seq);
     231             :  *                      text_buf[r.text_buf_size - 1] = 0;
     232             :  *              }
     233             :  *
     234             :  *              pr_info("%llu: %llu: %s\n", info.seq, info.ts_nsec,
     235             :  *                      &text_buf[0]);
     236             :  *      }
     237             :  *
     238             :  * Note that additional less convenient reader functions are available to
     239             :  * allow complex record access.
     240             :  *
     241             :  * ABA Issues
     242             :  * ~~~~~~~~~~
     243             :  * To help avoid ABA issues, descriptors are referenced by IDs (array index
     244             :  * values combined with tagged bits counting array wraps) and data blocks are
     245             :  * referenced by logical positions (array index values combined with tagged
     246             :  * bits counting array wraps). However, on 32-bit systems the number of
     247             :  * tagged bits is relatively small such that an ABA incident is (at least
     248             :  * theoretically) possible. For example, if 4 million maximally sized (1KiB)
     249             :  * printk messages were to occur in NMI context on a 32-bit system, the
     250             :  * interrupted context would not be able to recognize that the 32-bit integer
     251             :  * completely wrapped and thus represents a different data block than the one
     252             :  * the interrupted context expects.
     253             :  *
     254             :  * To help combat this possibility, additional state checking is performed
     255             :  * (such as using cmpxchg() even though set() would suffice). These extra
     256             :  * checks are commented as such and will hopefully catch any ABA issue that
     257             :  * a 32-bit system might experience.
     258             :  *
     259             :  * Memory Barriers
     260             :  * ~~~~~~~~~~~~~~~
     261             :  * Multiple memory barriers are used. To simplify proving correctness and
     262             :  * generating litmus tests, lines of code related to memory barriers
     263             :  * (loads, stores, and the associated memory barriers) are labeled::
     264             :  *
     265             :  *      LMM(function:letter)
     266             :  *
     267             :  * Comments reference the labels using only the "function:letter" part.
     268             :  *
     269             :  * The memory barrier pairs and their ordering are:
     270             :  *
     271             :  *   desc_reserve:D / desc_reserve:B
     272             :  *     push descriptor tail (id), then push descriptor head (id)
     273             :  *
     274             :  *   desc_reserve:D / data_push_tail:B
     275             :  *     push data tail (lpos), then set new descriptor reserved (state)
     276             :  *
     277             :  *   desc_reserve:D / desc_push_tail:C
     278             :  *     push descriptor tail (id), then set new descriptor reserved (state)
     279             :  *
     280             :  *   desc_reserve:D / prb_first_seq:C
     281             :  *     push descriptor tail (id), then set new descriptor reserved (state)
     282             :  *
     283             :  *   desc_reserve:F / desc_read:D
     284             :  *     set new descriptor id and reserved (state), then allow writer changes
     285             :  *
     286             :  *   data_alloc:A (or data_realloc:A) / desc_read:D
     287             :  *     set old descriptor reusable (state), then modify new data block area
     288             :  *
     289             :  *   data_alloc:A (or data_realloc:A) / data_push_tail:B
     290             :  *     push data tail (lpos), then modify new data block area
     291             :  *
     292             :  *   _prb_commit:B / desc_read:B
     293             :  *     store writer changes, then set new descriptor committed (state)
     294             :  *
     295             :  *   desc_reopen_last:A / _prb_commit:B
     296             :  *     set descriptor reserved (state), then read descriptor data
     297             :  *
     298             :  *   _prb_commit:B / desc_reserve:D
     299             :  *     set new descriptor committed (state), then check descriptor head (id)
     300             :  *
     301             :  *   data_push_tail:D / data_push_tail:A
     302             :  *     set descriptor reusable (state), then push data tail (lpos)
     303             :  *
     304             :  *   desc_push_tail:B / desc_reserve:D
     305             :  *     set descriptor reusable (state), then push descriptor tail (id)
     306             :  */
     307             : 
     308             : #define DATA_SIZE(data_ring)            _DATA_SIZE((data_ring)->size_bits)
     309             : #define DATA_SIZE_MASK(data_ring)       (DATA_SIZE(data_ring) - 1)
     310             : 
     311             : #define DESCS_COUNT(desc_ring)          _DESCS_COUNT((desc_ring)->count_bits)
     312             : #define DESCS_COUNT_MASK(desc_ring)     (DESCS_COUNT(desc_ring) - 1)
     313             : 
     314             : /* Determine the data array index from a logical position. */
     315             : #define DATA_INDEX(data_ring, lpos)     ((lpos) & DATA_SIZE_MASK(data_ring))
     316             : 
     317             : /* Determine the desc array index from an ID or sequence number. */
     318             : #define DESC_INDEX(desc_ring, n)        ((n) & DESCS_COUNT_MASK(desc_ring))
     319             : 
     320             : /* Determine how many times the data array has wrapped. */
     321             : #define DATA_WRAPS(data_ring, lpos)     ((lpos) >> (data_ring)->size_bits)
     322             : 
     323             : /* Determine if a logical position refers to a data-less block. */
     324             : #define LPOS_DATALESS(lpos)             ((lpos) & 1UL)
     325             : #define BLK_DATALESS(blk)               (LPOS_DATALESS((blk)->begin) && \
     326             :                                          LPOS_DATALESS((blk)->next))
     327             : 
     328             : /* Get the logical position at index 0 of the current wrap. */
     329             : #define DATA_THIS_WRAP_START_LPOS(data_ring, lpos) \
     330             : ((lpos) & ~DATA_SIZE_MASK(data_ring))
     331             : 
     332             : /* Get the ID for the same index of the previous wrap as the given ID. */
     333             : #define DESC_ID_PREV_WRAP(desc_ring, id) \
     334             : DESC_ID((id) - DESCS_COUNT(desc_ring))
     335             : 
     336             : /*
     337             :  * A data block: mapped directly to the beginning of the data block area
     338             :  * specified as a logical position within the data ring.
     339             :  *
     340             :  * @id:   the ID of the associated descriptor
     341             :  * @data: the writer data
     342             :  *
     343             :  * Note that the size of a data block is only known by its associated
     344             :  * descriptor.
     345             :  */
     346             : struct prb_data_block {
     347             :         unsigned long   id;
     348             :         char            data[];
     349             : };
     350             : 
     351             : /*
     352             :  * Return the descriptor associated with @n. @n can be either a
     353             :  * descriptor ID or a sequence number.
     354             :  */
     355             : static struct prb_desc *to_desc(struct prb_desc_ring *desc_ring, u64 n)
     356             : {
     357        3820 :         return &desc_ring->descs[DESC_INDEX(desc_ring, n)];
     358             : }
     359             : 
     360             : /*
     361             :  * Return the printk_info associated with @n. @n can be either a
     362             :  * descriptor ID or a sequence number.
     363             :  */
     364             : static struct printk_info *to_info(struct prb_desc_ring *desc_ring, u64 n)
     365             : {
     366        3007 :         return &desc_ring->infos[DESC_INDEX(desc_ring, n)];
     367             : }
     368             : 
     369             : static struct prb_data_block *to_block(struct prb_data_ring *data_ring,
     370             :                                        unsigned long begin_lpos)
     371             : {
     372         726 :         return (void *)&data_ring->data[DATA_INDEX(data_ring, begin_lpos)];
     373             : }
     374             : 
     375             : /*
     376             :  * Increase the data size to account for data block meta data plus any
     377             :  * padding so that the adjacent data block is aligned on the ID size.
     378             :  */
     379             : static unsigned int to_blk_size(unsigned int size)
     380             : {
     381         568 :         struct prb_data_block *db = NULL;
     382             : 
     383         568 :         size += sizeof(*db);
     384         568 :         size = ALIGN(size, sizeof(db->id));
     385             :         return size;
     386             : }
     387             : 
     388             : /*
     389             :  * Sanity checker for reserve size. The ringbuffer code assumes that a data
     390             :  * block does not exceed the maximum possible size that could fit within the
     391             :  * ringbuffer. This function provides that basic size check so that the
     392             :  * assumption is safe.
     393             :  */
     394             : static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size)
     395             : {
     396         284 :         struct prb_data_block *db = NULL;
     397             : 
     398         284 :         if (size == 0)
     399             :                 return true;
     400             : 
     401             :         /*
     402             :          * Ensure the alignment padded size could possibly fit in the data
     403             :          * array. The largest possible data block must still leave room for
     404             :          * at least the ID of the next block.
     405             :          */
     406         284 :         size = to_blk_size(size);
     407         284 :         if (size > DATA_SIZE(data_ring) - sizeof(db->id))
     408             :                 return false;
     409             : 
     410             :         return true;
     411             : }
     412             : 
     413             : /* Query the state of a descriptor. */
     414             : static enum desc_state get_desc_state(unsigned long id,
     415             :                                       unsigned long state_val)
     416             : {
     417        3568 :         if (id != DESC_ID(state_val))
     418             :                 return desc_miss;
     419             : 
     420        3490 :         return DESC_STATE(state_val);
     421             : }
     422             : 
     423             : /*
     424             :  * Get a copy of a specified descriptor and return its queried state. If the
     425             :  * descriptor is in an inconsistent state (miss or reserved), the caller can
     426             :  * only expect the descriptor's @state_var field to be valid.
     427             :  *
     428             :  * The sequence number and caller_id can be optionally retrieved. Like all
     429             :  * non-state_var data, they are only valid if the descriptor is in a
     430             :  * consistent state.
     431             :  */
     432        1823 : static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
     433             :                                  unsigned long id, struct prb_desc *desc_out,
     434             :                                  u64 *seq_out, u32 *caller_id_out)
     435             : {
     436        3646 :         struct printk_info *info = to_info(desc_ring, id);
     437        3646 :         struct prb_desc *desc = to_desc(desc_ring, id);
     438        1823 :         atomic_long_t *state_var = &desc->state_var;
     439             :         enum desc_state d_state;
     440             :         unsigned long state_val;
     441             : 
     442             :         /* Check the descriptor state. */
     443        1823 :         state_val = atomic_long_read(state_var); /* LMM(desc_read:A) */
     444        1823 :         d_state = get_desc_state(id, state_val);
     445        1823 :         if (d_state == desc_miss || d_state == desc_reserved) {
     446             :                 /*
     447             :                  * The descriptor is in an inconsistent state. Set at least
     448             :                  * @state_var so that the caller can see the details of
     449             :                  * the inconsistent state.
     450             :                  */
     451             :                 goto out;
     452             :         }
     453             : 
     454             :         /*
     455             :          * Guarantee the state is loaded before copying the descriptor
     456             :          * content. This avoids copying obsolete descriptor content that might
     457             :          * not apply to the descriptor state. This pairs with _prb_commit:B.
     458             :          *
     459             :          * Memory barrier involvement:
     460             :          *
     461             :          * If desc_read:A reads from _prb_commit:B, then desc_read:C reads
     462             :          * from _prb_commit:A.
     463             :          *
     464             :          * Relies on:
     465             :          *
     466             :          * WMB from _prb_commit:A to _prb_commit:B
     467             :          *    matching
     468             :          * RMB from desc_read:A to desc_read:C
     469             :          */
     470        1745 :         smp_rmb(); /* LMM(desc_read:B) */
     471             : 
     472             :         /*
     473             :          * Copy the descriptor data. The data is not valid until the
     474             :          * state has been re-checked. A memcpy() for all of @desc
     475             :          * cannot be used because of the atomic_t @state_var field.
     476             :          */
     477        1745 :         if (desc_out) {
     478        3488 :                 memcpy(&desc_out->text_blk_lpos, &desc->text_blk_lpos,
     479             :                        sizeof(desc_out->text_blk_lpos)); /* LMM(desc_read:C) */
     480             :         }
     481        1745 :         if (seq_out)
     482        1723 :                 *seq_out = info->seq; /* also part of desc_read:C */
     483        1745 :         if (caller_id_out)
     484          22 :                 *caller_id_out = info->caller_id; /* also part of desc_read:C */
     485             : 
     486             :         /*
     487             :          * 1. Guarantee the descriptor content is loaded before re-checking
     488             :          *    the state. This avoids reading an obsolete descriptor state
     489             :          *    that may not apply to the copied content. This pairs with
     490             :          *    desc_reserve:F.
     491             :          *
     492             :          *    Memory barrier involvement:
     493             :          *
     494             :          *    If desc_read:C reads from desc_reserve:G, then desc_read:E
     495             :          *    reads from desc_reserve:F.
     496             :          *
     497             :          *    Relies on:
     498             :          *
     499             :          *    WMB from desc_reserve:F to desc_reserve:G
     500             :          *       matching
     501             :          *    RMB from desc_read:C to desc_read:E
     502             :          *
     503             :          * 2. Guarantee the record data is loaded before re-checking the
     504             :          *    state. This avoids reading an obsolete descriptor state that may
     505             :          *    not apply to the copied data. This pairs with data_alloc:A and
     506             :          *    data_realloc:A.
     507             :          *
     508             :          *    Memory barrier involvement:
     509             :          *
     510             :          *    If copy_data:A reads from data_alloc:B, then desc_read:E
     511             :          *    reads from desc_make_reusable:A.
     512             :          *
     513             :          *    Relies on:
     514             :          *
     515             :          *    MB from desc_make_reusable:A to data_alloc:B
     516             :          *       matching
     517             :          *    RMB from desc_read:C to desc_read:E
     518             :          *
     519             :          *    Note: desc_make_reusable:A and data_alloc:B can be different
     520             :          *          CPUs. However, the data_alloc:B CPU (which performs the
     521             :          *          full memory barrier) must have previously seen
     522             :          *          desc_make_reusable:A.
     523             :          */
     524        1745 :         smp_rmb(); /* LMM(desc_read:D) */
     525             : 
     526             :         /*
     527             :          * The data has been copied. Return the current descriptor state,
     528             :          * which may have changed since the load above.
     529             :          */
     530        1745 :         state_val = atomic_long_read(state_var); /* LMM(desc_read:E) */
     531             :         d_state = get_desc_state(id, state_val);
     532             : out:
     533        1823 :         if (desc_out)
     534        1822 :                 atomic_long_set(&desc_out->state_var, state_val);
     535        1823 :         return d_state;
     536             : }
     537             : 
     538             : /*
     539             :  * Take a specified descriptor out of the finalized state by attempting
     540             :  * the transition from finalized to reusable. Either this context or some
     541             :  * other context will have been successful.
     542             :  */
     543             : static void desc_make_reusable(struct prb_desc_ring *desc_ring,
     544             :                                unsigned long id)
     545             : {
     546           0 :         unsigned long val_finalized = DESC_SV(id, desc_finalized);
     547           0 :         unsigned long val_reusable = DESC_SV(id, desc_reusable);
     548           0 :         struct prb_desc *desc = to_desc(desc_ring, id);
     549           0 :         atomic_long_t *state_var = &desc->state_var;
     550             : 
     551           0 :         atomic_long_cmpxchg_relaxed(state_var, val_finalized,
     552             :                                     val_reusable); /* LMM(desc_make_reusable:A) */
     553             : }
     554             : 
     555             : /*
     556             :  * Given the text data ring, put the associated descriptor of each
     557             :  * data block from @lpos_begin until @lpos_end into the reusable state.
     558             :  *
     559             :  * If there is any problem making the associated descriptor reusable, either
     560             :  * the descriptor has not yet been finalized or another writer context has
     561             :  * already pushed the tail lpos past the problematic data block. Regardless,
     562             :  * on error the caller can re-load the tail lpos to determine the situation.
     563             :  */
     564           0 : static bool data_make_reusable(struct printk_ringbuffer *rb,
     565             :                                unsigned long lpos_begin,
     566             :                                unsigned long lpos_end,
     567             :                                unsigned long *lpos_out)
     568             : {
     569             : 
     570           0 :         struct prb_data_ring *data_ring = &rb->text_data_ring;
     571           0 :         struct prb_desc_ring *desc_ring = &rb->desc_ring;
     572             :         struct prb_data_block *blk;
     573             :         enum desc_state d_state;
     574             :         struct prb_desc desc;
     575           0 :         struct prb_data_blk_lpos *blk_lpos = &desc.text_blk_lpos;
     576             :         unsigned long id;
     577             : 
     578             :         /* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
     579           0 :         while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
     580           0 :                 blk = to_block(data_ring, lpos_begin);
     581             : 
     582             :                 /*
     583             :                  * Load the block ID from the data block. This is a data race
     584             :                  * against a writer that may have newly reserved this data
     585             :                  * area. If the loaded value matches a valid descriptor ID,
     586             :                  * the blk_lpos of that descriptor will be checked to make
     587             :                  * sure it points back to this data block. If the check fails,
     588             :                  * the data area has been recycled by another writer.
     589             :                  */
     590           0 :                 id = blk->id; /* LMM(data_make_reusable:A) */
     591             : 
     592           0 :                 d_state = desc_read(desc_ring, id, &desc,
     593             :                                     NULL, NULL); /* LMM(data_make_reusable:B) */
     594             : 
     595           0 :                 switch (d_state) {
     596             :                 case desc_miss:
     597             :                 case desc_reserved:
     598             :                 case desc_committed:
     599             :                         return false;
     600             :                 case desc_finalized:
     601             :                         /*
     602             :                          * This data block is invalid if the descriptor
     603             :                          * does not point back to it.
     604             :                          */
     605           0 :                         if (blk_lpos->begin != lpos_begin)
     606             :                                 return false;
     607             :                         desc_make_reusable(desc_ring, id);
     608             :                         break;
     609             :                 case desc_reusable:
     610             :                         /*
     611             :                          * This data block is invalid if the descriptor
     612             :                          * does not point back to it.
     613             :                          */
     614           0 :                         if (blk_lpos->begin != lpos_begin)
     615             :                                 return false;
     616             :                         break;
     617             :                 }
     618             : 
     619             :                 /* Advance @lpos_begin to the next data block. */
     620           0 :                 lpos_begin = blk_lpos->next;
     621             :         }
     622             : 
     623           0 :         *lpos_out = lpos_begin;
     624           0 :         return true;
     625             : }
     626             : 
     627             : /*
     628             :  * Advance the data ring tail to at least @lpos. This function puts
     629             :  * descriptors into the reusable state if the tail is pushed beyond
     630             :  * their associated data block.
     631             :  */
     632         279 : static bool data_push_tail(struct printk_ringbuffer *rb, unsigned long lpos)
     633             : {
     634         279 :         struct prb_data_ring *data_ring = &rb->text_data_ring;
     635             :         unsigned long tail_lpos_new;
     636             :         unsigned long tail_lpos;
     637             :         unsigned long next_lpos;
     638             : 
     639             :         /* If @lpos is from a data-less block, there is nothing to do. */
     640         279 :         if (LPOS_DATALESS(lpos))
     641             :                 return true;
     642             : 
     643             :         /*
     644             :          * Any descriptor states that have transitioned to reusable due to the
     645             :          * data tail being pushed to this loaded value will be visible to this
     646             :          * CPU. This pairs with data_push_tail:D.
     647             :          *
     648             :          * Memory barrier involvement:
     649             :          *
     650             :          * If data_push_tail:A reads from data_push_tail:D, then this CPU can
     651             :          * see desc_make_reusable:A.
     652             :          *
     653             :          * Relies on:
     654             :          *
     655             :          * MB from desc_make_reusable:A to data_push_tail:D
     656             :          *    matches
     657             :          * READFROM from data_push_tail:D to data_push_tail:A
     658             :          *    thus
     659             :          * READFROM from desc_make_reusable:A to this CPU
     660             :          */
     661         558 :         tail_lpos = atomic_long_read(&data_ring->tail_lpos); /* LMM(data_push_tail:A) */
     662             : 
     663             :         /*
     664             :          * Loop until the tail lpos is at or beyond @lpos. This condition
     665             :          * may already be satisfied, resulting in no full memory barrier
     666             :          * from data_push_tail:D being performed. However, since this CPU
     667             :          * sees the new tail lpos, any descriptor states that transitioned to
     668             :          * the reusable state must already be visible.
     669             :          */
     670         558 :         while ((lpos - tail_lpos) - 1 < DATA_SIZE(data_ring)) {
     671             :                 /*
     672             :                  * Make all descriptors reusable that are associated with
     673             :                  * data blocks before @lpos.
     674             :                  */
     675           0 :                 if (!data_make_reusable(rb, tail_lpos, lpos, &next_lpos)) {
     676             :                         /*
     677             :                          * 1. Guarantee the block ID loaded in
     678             :                          *    data_make_reusable() is performed before
     679             :                          *    reloading the tail lpos. The failed
     680             :                          *    data_make_reusable() may be due to a newly
     681             :                          *    recycled data area causing the tail lpos to
     682             :                          *    have been previously pushed. This pairs with
     683             :                          *    data_alloc:A and data_realloc:A.
     684             :                          *
     685             :                          *    Memory barrier involvement:
     686             :                          *
     687             :                          *    If data_make_reusable:A reads from data_alloc:B,
     688             :                          *    then data_push_tail:C reads from
     689             :                          *    data_push_tail:D.
     690             :                          *
     691             :                          *    Relies on:
     692             :                          *
     693             :                          *    MB from data_push_tail:D to data_alloc:B
     694             :                          *       matching
     695             :                          *    RMB from data_make_reusable:A to
     696             :                          *    data_push_tail:C
     697             :                          *
     698             :                          *    Note: data_push_tail:D and data_alloc:B can be
     699             :                          *          different CPUs. However, the data_alloc:B
     700             :                          *          CPU (which performs the full memory
     701             :                          *          barrier) must have previously seen
     702             :                          *          data_push_tail:D.
     703             :                          *
     704             :                          * 2. Guarantee the descriptor state loaded in
     705             :                          *    data_make_reusable() is performed before
     706             :                          *    reloading the tail lpos. The failed
     707             :                          *    data_make_reusable() may be due to a newly
     708             :                          *    recycled descriptor causing the tail lpos to
     709             :                          *    have been previously pushed. This pairs with
     710             :                          *    desc_reserve:D.
     711             :                          *
     712             :                          *    Memory barrier involvement:
     713             :                          *
     714             :                          *    If data_make_reusable:B reads from
     715             :                          *    desc_reserve:F, then data_push_tail:C reads
     716             :                          *    from data_push_tail:D.
     717             :                          *
     718             :                          *    Relies on:
     719             :                          *
     720             :                          *    MB from data_push_tail:D to desc_reserve:F
     721             :                          *       matching
     722             :                          *    RMB from data_make_reusable:B to
     723             :                          *    data_push_tail:C
     724             :                          *
     725             :                          *    Note: data_push_tail:D and desc_reserve:F can
     726             :                          *          be different CPUs. However, the
     727             :                          *          desc_reserve:F CPU (which performs the
     728             :                          *          full memory barrier) must have previously
     729             :                          *          seen data_push_tail:D.
     730             :                          */
     731           0 :                         smp_rmb(); /* LMM(data_push_tail:B) */
     732             : 
     733           0 :                         tail_lpos_new = atomic_long_read(&data_ring->tail_lpos
     734             :                                                         ); /* LMM(data_push_tail:C) */
     735           0 :                         if (tail_lpos_new == tail_lpos)
     736             :                                 return false;
     737             : 
     738             :                         /* Another CPU pushed the tail. Try again. */
     739           0 :                         tail_lpos = tail_lpos_new;
     740           0 :                         continue;
     741             :                 }
     742             : 
     743             :                 /*
     744             :                  * Guarantee any descriptor states that have transitioned to
     745             :                  * reusable are stored before pushing the tail lpos. A full
     746             :                  * memory barrier is needed since other CPUs may have made
     747             :                  * the descriptor states reusable. This pairs with
     748             :                  * data_push_tail:A.
     749             :                  */
     750           0 :                 if (atomic_long_try_cmpxchg(&data_ring->tail_lpos, &tail_lpos,
     751             :                                             next_lpos)) { /* LMM(data_push_tail:D) */
     752             :                         break;
     753             :                 }
     754             :         }
     755             : 
     756             :         return true;
     757             : }
     758             : 
     759             : /*
     760             :  * Advance the desc ring tail. This function advances the tail by one
     761             :  * descriptor, thus invalidating the oldest descriptor. Before advancing
     762             :  * the tail, the tail descriptor is made reusable and all data blocks up to
     763             :  * and including the descriptor's data block are invalidated (i.e. the data
     764             :  * ring tail is pushed past the data block of the descriptor being made
     765             :  * reusable).
     766             :  */
     767           0 : static bool desc_push_tail(struct printk_ringbuffer *rb,
     768             :                            unsigned long tail_id)
     769             : {
     770           0 :         struct prb_desc_ring *desc_ring = &rb->desc_ring;
     771             :         enum desc_state d_state;
     772             :         struct prb_desc desc;
     773             : 
     774           0 :         d_state = desc_read(desc_ring, tail_id, &desc, NULL, NULL);
     775             : 
     776           0 :         switch (d_state) {
     777             :         case desc_miss:
     778             :                 /*
     779             :                  * If the ID is exactly 1 wrap behind the expected, it is
     780             :                  * in the process of being reserved by another writer and
     781             :                  * must be considered reserved.
     782             :                  */
     783           0 :                 if (DESC_ID(atomic_long_read(&desc.state_var)) ==
     784           0 :                     DESC_ID_PREV_WRAP(desc_ring, tail_id)) {
     785             :                         return false;
     786             :                 }
     787             : 
     788             :                 /*
     789             :                  * The ID has changed. Another writer must have pushed the
     790             :                  * tail and recycled the descriptor already. Success is
     791             :                  * returned because the caller is only interested in the
     792             :                  * specified tail being pushed, which it was.
     793             :                  */
     794           0 :                 return true;
     795             :         case desc_reserved:
     796             :         case desc_committed:
     797             :                 return false;
     798             :         case desc_finalized:
     799             :                 desc_make_reusable(desc_ring, tail_id);
     800             :                 break;
     801             :         case desc_reusable:
     802             :                 break;
     803             :         }
     804             : 
     805             :         /*
     806             :          * Data blocks must be invalidated before their associated
     807             :          * descriptor can be made available for recycling. Invalidating
     808             :          * them later is not possible because there is no way to trust
     809             :          * data blocks once their associated descriptor is gone.
     810             :          */
     811             : 
     812           0 :         if (!data_push_tail(rb, desc.text_blk_lpos.next))
     813             :                 return false;
     814             : 
     815             :         /*
     816             :          * Check the next descriptor after @tail_id before pushing the tail
     817             :          * to it because the tail must always be in a finalized or reusable
     818             :          * state. The implementation of prb_first_seq() relies on this.
     819             :          *
     820             :          * A successful read implies that the next descriptor is less than or
     821             :          * equal to @head_id so there is no risk of pushing the tail past the
     822             :          * head.
     823             :          */
     824           0 :         d_state = desc_read(desc_ring, DESC_ID(tail_id + 1), &desc,
     825             :                             NULL, NULL); /* LMM(desc_push_tail:A) */
     826             : 
     827           0 :         if (d_state == desc_finalized || d_state == desc_reusable) {
     828             :                 /*
     829             :                  * Guarantee any descriptor states that have transitioned to
     830             :                  * reusable are stored before pushing the tail ID. This allows
     831             :                  * verifying the recycled descriptor state. A full memory
     832             :                  * barrier is needed since other CPUs may have made the
     833             :                  * descriptor states reusable. This pairs with desc_reserve:D.
     834             :                  */
     835           0 :                 atomic_long_cmpxchg(&desc_ring->tail_id, tail_id,
     836           0 :                                     DESC_ID(tail_id + 1)); /* LMM(desc_push_tail:B) */
     837             :         } else {
     838             :                 /*
     839             :                  * Guarantee the last state load from desc_read() is before
     840             :                  * reloading @tail_id in order to see a new tail ID in the
     841             :                  * case that the descriptor has been recycled. This pairs
     842             :                  * with desc_reserve:D.
     843             :                  *
     844             :                  * Memory barrier involvement:
     845             :                  *
     846             :                  * If desc_push_tail:A reads from desc_reserve:F, then
     847             :                  * desc_push_tail:D reads from desc_push_tail:B.
     848             :                  *
     849             :                  * Relies on:
     850             :                  *
     851             :                  * MB from desc_push_tail:B to desc_reserve:F
     852             :                  *    matching
     853             :                  * RMB from desc_push_tail:A to desc_push_tail:D
     854             :                  *
     855             :                  * Note: desc_push_tail:B and desc_reserve:F can be different
     856             :                  *       CPUs. However, the desc_reserve:F CPU (which performs
     857             :                  *       the full memory barrier) must have previously seen
     858             :                  *       desc_push_tail:B.
     859             :                  */
     860           0 :                 smp_rmb(); /* LMM(desc_push_tail:C) */
     861             : 
     862             :                 /*
     863             :                  * Re-check the tail ID. The descriptor following @tail_id is
     864             :                  * not in an allowed tail state. But if the tail has since
     865             :                  * been moved by another CPU, then it does not matter.
     866             :                  */
     867           0 :                 if (atomic_long_read(&desc_ring->tail_id) == tail_id) /* LMM(desc_push_tail:D) */
     868             :                         return false;
     869             :         }
     870             : 
     871             :         return true;
     872             : }
     873             : 
     874             : /* Reserve a new descriptor, invalidating the oldest if necessary. */
     875         265 : static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
     876             : {
     877         265 :         struct prb_desc_ring *desc_ring = &rb->desc_ring;
     878             :         unsigned long prev_state_val;
     879             :         unsigned long id_prev_wrap;
     880             :         struct prb_desc *desc;
     881             :         unsigned long head_id;
     882             :         unsigned long id;
     883             : 
     884         530 :         head_id = atomic_long_read(&desc_ring->head_id); /* LMM(desc_reserve:A) */
     885             : 
     886             :         do {
     887         265 :                 id = DESC_ID(head_id + 1);
     888         265 :                 id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
     889             : 
     890             :                 /*
     891             :                  * Guarantee the head ID is read before reading the tail ID.
     892             :                  * Since the tail ID is updated before the head ID, this
     893             :                  * guarantees that @id_prev_wrap is never ahead of the tail
     894             :                  * ID. This pairs with desc_reserve:D.
     895             :                  *
     896             :                  * Memory barrier involvement:
     897             :                  *
     898             :                  * If desc_reserve:A reads from desc_reserve:D, then
     899             :                  * desc_reserve:C reads from desc_push_tail:B.
     900             :                  *
     901             :                  * Relies on:
     902             :                  *
     903             :                  * MB from desc_push_tail:B to desc_reserve:D
     904             :                  *    matching
     905             :                  * RMB from desc_reserve:A to desc_reserve:C
     906             :                  *
     907             :                  * Note: desc_push_tail:B and desc_reserve:D can be different
     908             :                  *       CPUs. However, the desc_reserve:D CPU (which performs
     909             :                  *       the full memory barrier) must have previously seen
     910             :                  *       desc_push_tail:B.
     911             :                  */
     912         265 :                 smp_rmb(); /* LMM(desc_reserve:B) */
     913             : 
     914         530 :                 if (id_prev_wrap == atomic_long_read(&desc_ring->tail_id
     915             :                                                     )) { /* LMM(desc_reserve:C) */
     916             :                         /*
     917             :                          * Make space for the new descriptor by
     918             :                          * advancing the tail.
     919             :                          */
     920           0 :                         if (!desc_push_tail(rb, id_prev_wrap))
     921             :                                 return false;
     922             :                 }
     923             : 
     924             :                 /*
     925             :                  * 1. Guarantee the tail ID is read before validating the
     926             :                  *    recycled descriptor state. A read memory barrier is
     927             :                  *    sufficient for this. This pairs with desc_push_tail:B.
     928             :                  *
     929             :                  *    Memory barrier involvement:
     930             :                  *
     931             :                  *    If desc_reserve:C reads from desc_push_tail:B, then
     932             :                  *    desc_reserve:E reads from desc_make_reusable:A.
     933             :                  *
     934             :                  *    Relies on:
     935             :                  *
     936             :                  *    MB from desc_make_reusable:A to desc_push_tail:B
     937             :                  *       matching
     938             :                  *    RMB from desc_reserve:C to desc_reserve:E
     939             :                  *
     940             :                  *    Note: desc_make_reusable:A and desc_push_tail:B can be
     941             :                  *          different CPUs. However, the desc_push_tail:B CPU
     942             :                  *          (which performs the full memory barrier) must have
     943             :                  *          previously seen desc_make_reusable:A.
     944             :                  *
     945             :                  * 2. Guarantee the tail ID is stored before storing the head
     946             :                  *    ID. This pairs with desc_reserve:B.
     947             :                  *
     948             :                  * 3. Guarantee any data ring tail changes are stored before
     949             :                  *    recycling the descriptor. Data ring tail changes can
     950             :                  *    happen via desc_push_tail()->data_push_tail(). A full
     951             :                  *    memory barrier is needed since another CPU may have
     952             :                  *    pushed the data ring tails. This pairs with
     953             :                  *    data_push_tail:B.
     954             :                  *
     955             :                  * 4. Guarantee a new tail ID is stored before recycling the
     956             :                  *    descriptor. A full memory barrier is needed since
     957             :                  *    another CPU may have pushed the tail ID. This pairs
     958             :                  *    with desc_push_tail:C and this also pairs with
     959             :                  *    prb_first_seq:C.
     960             :                  *
     961             :                  * 5. Guarantee the head ID is stored before trying to
     962             :                  *    finalize the previous descriptor. This pairs with
     963             :                  *    _prb_commit:B.
     964             :                  */
     965         530 :         } while (!atomic_long_try_cmpxchg(&desc_ring->head_id, &head_id,
     966         265 :                                           id)); /* LMM(desc_reserve:D) */
     967             : 
     968         530 :         desc = to_desc(desc_ring, id);
     969             : 
     970             :         /*
     971             :          * If the descriptor has been recycled, verify the old state val.
     972             :          * See "ABA Issues" about why this verification is performed.
     973             :          */
     974         530 :         prev_state_val = atomic_long_read(&desc->state_var); /* LMM(desc_reserve:E) */
     975         265 :         if (prev_state_val &&
     976           0 :             get_desc_state(id_prev_wrap, prev_state_val) != desc_reusable) {
     977           0 :                 WARN_ON_ONCE(1);
     978             :                 return false;
     979             :         }
     980             : 
     981             :         /*
     982             :          * Assign the descriptor a new ID and set its state to reserved.
     983             :          * See "ABA Issues" about why cmpxchg() instead of set() is used.
     984             :          *
     985             :          * Guarantee the new descriptor ID and state is stored before making
     986             :          * any other changes. A write memory barrier is sufficient for this.
     987             :          * This pairs with desc_read:D.
     988             :          */
     989         530 :         if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
     990             :                         DESC_SV(id, desc_reserved))) { /* LMM(desc_reserve:F) */
     991           0 :                 WARN_ON_ONCE(1);
     992             :                 return false;
     993             :         }
     994             : 
     995             :         /* Now data in @desc can be modified: LMM(desc_reserve:G) */
     996             : 
     997         265 :         *id_out = id;
     998         265 :         return true;
     999             : }
    1000             : 
    1001             : /* Determine the end of a data block. */
    1002             : static unsigned long get_next_lpos(struct prb_data_ring *data_ring,
    1003             :                                    unsigned long lpos, unsigned int size)
    1004             : {
    1005             :         unsigned long begin_lpos;
    1006             :         unsigned long next_lpos;
    1007             : 
    1008         284 :         begin_lpos = lpos;
    1009         284 :         next_lpos = lpos + size;
    1010             : 
    1011             :         /* First check if the data block does not wrap. */
    1012         284 :         if (DATA_WRAPS(data_ring, begin_lpos) == DATA_WRAPS(data_ring, next_lpos))
    1013             :                 return next_lpos;
    1014             : 
    1015             :         /* Wrapping data blocks store their data at the beginning. */
    1016           0 :         return (DATA_THIS_WRAP_START_LPOS(data_ring, next_lpos) + size);
    1017             : }
    1018             : 
    1019             : /*
    1020             :  * Allocate a new data block, invalidating the oldest data block(s)
    1021             :  * if necessary. This function also associates the data block with
    1022             :  * a specified descriptor.
    1023             :  */
    1024         265 : static char *data_alloc(struct printk_ringbuffer *rb, unsigned int size,
    1025             :                         struct prb_data_blk_lpos *blk_lpos, unsigned long id)
    1026             : {
    1027         265 :         struct prb_data_ring *data_ring = &rb->text_data_ring;
    1028             :         struct prb_data_block *blk;
    1029             :         unsigned long begin_lpos;
    1030             :         unsigned long next_lpos;
    1031             : 
    1032         265 :         if (size == 0) {
    1033             :                 /* Specify a data-less block. */
    1034           0 :                 blk_lpos->begin = NO_LPOS;
    1035           0 :                 blk_lpos->next = NO_LPOS;
    1036           0 :                 return NULL;
    1037             :         }
    1038             : 
    1039         265 :         size = to_blk_size(size);
    1040             : 
    1041         530 :         begin_lpos = atomic_long_read(&data_ring->head_lpos);
    1042             : 
    1043             :         do {
    1044         530 :                 next_lpos = get_next_lpos(data_ring, begin_lpos, size);
    1045             : 
    1046         265 :                 if (!data_push_tail(rb, next_lpos - DATA_SIZE(data_ring))) {
    1047             :                         /* Failed to allocate, specify a data-less block. */
    1048           0 :                         blk_lpos->begin = FAILED_LPOS;
    1049           0 :                         blk_lpos->next = FAILED_LPOS;
    1050           0 :                         return NULL;
    1051             :                 }
    1052             : 
    1053             :                 /*
    1054             :                  * 1. Guarantee any descriptor states that have transitioned
    1055             :                  *    to reusable are stored before modifying the newly
    1056             :                  *    allocated data area. A full memory barrier is needed
    1057             :                  *    since other CPUs may have made the descriptor states
    1058             :                  *    reusable. See data_push_tail:A about why the reusable
    1059             :                  *    states are visible. This pairs with desc_read:D.
    1060             :                  *
    1061             :                  * 2. Guarantee any updated tail lpos is stored before
    1062             :                  *    modifying the newly allocated data area. Another CPU may
    1063             :                  *    be in data_make_reusable() and is reading a block ID
    1064             :                  *    from this area. data_make_reusable() can handle reading
    1065             :                  *    a garbage block ID value, but then it must be able to
    1066             :                  *    load a new tail lpos. A full memory barrier is needed
    1067             :                  *    since other CPUs may have updated the tail lpos. This
    1068             :                  *    pairs with data_push_tail:B.
    1069             :                  */
    1070         530 :         } while (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &begin_lpos,
    1071         265 :                                           next_lpos)); /* LMM(data_alloc:A) */
    1072             : 
    1073         530 :         blk = to_block(data_ring, begin_lpos);
    1074         265 :         blk->id = id; /* LMM(data_alloc:B) */
    1075             : 
    1076         265 :         if (DATA_WRAPS(data_ring, begin_lpos) != DATA_WRAPS(data_ring, next_lpos)) {
    1077             :                 /* Wrapping data blocks store their data at the beginning. */
    1078           0 :                 blk = to_block(data_ring, 0);
    1079             : 
    1080             :                 /*
    1081             :                  * Store the ID on the wrapped block for consistency.
    1082             :                  * The printk_ringbuffer does not actually use it.
    1083             :                  */
    1084           0 :                 blk->id = id;
    1085             :         }
    1086             : 
    1087         265 :         blk_lpos->begin = begin_lpos;
    1088         265 :         blk_lpos->next = next_lpos;
    1089             : 
    1090         265 :         return &blk->data[0];
    1091             : }
    1092             : 
    1093             : /*
    1094             :  * Try to resize an existing data block associated with the descriptor
    1095             :  * specified by @id. If the resized data block should become wrapped, it
    1096             :  * copies the old data to the new data block. If @size yields a data block
    1097             :  * with the same or less size, the data block is left as is.
    1098             :  *
    1099             :  * Fail if this is not the last allocated data block or if there is not
    1100             :  * enough space or it is not possible make enough space.
    1101             :  *
    1102             :  * Return a pointer to the beginning of the entire data buffer or NULL on
    1103             :  * failure.
    1104             :  */
    1105          19 : static char *data_realloc(struct printk_ringbuffer *rb, unsigned int size,
    1106             :                           struct prb_data_blk_lpos *blk_lpos, unsigned long id)
    1107             : {
    1108          19 :         struct prb_data_ring *data_ring = &rb->text_data_ring;
    1109             :         struct prb_data_block *blk;
    1110             :         unsigned long head_lpos;
    1111             :         unsigned long next_lpos;
    1112             :         bool wrapped;
    1113             : 
    1114             :         /* Reallocation only works if @blk_lpos is the newest data block. */
    1115          38 :         head_lpos = atomic_long_read(&data_ring->head_lpos);
    1116          19 :         if (head_lpos != blk_lpos->next)
    1117             :                 return NULL;
    1118             : 
    1119             :         /* Keep track if @blk_lpos was a wrapping data block. */
    1120          19 :         wrapped = (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, blk_lpos->next));
    1121             : 
    1122          19 :         size = to_blk_size(size);
    1123             : 
    1124          38 :         next_lpos = get_next_lpos(data_ring, blk_lpos->begin, size);
    1125             : 
    1126             :         /* If the data block does not increase, there is nothing to do. */
    1127          19 :         if (head_lpos - next_lpos < DATA_SIZE(data_ring)) {
    1128           5 :                 if (wrapped)
    1129           0 :                         blk = to_block(data_ring, 0);
    1130             :                 else
    1131          10 :                         blk = to_block(data_ring, blk_lpos->begin);
    1132           5 :                 return &blk->data[0];
    1133             :         }
    1134             : 
    1135          14 :         if (!data_push_tail(rb, next_lpos - DATA_SIZE(data_ring)))
    1136             :                 return NULL;
    1137             : 
    1138             :         /* The memory barrier involvement is the same as data_alloc:A. */
    1139          28 :         if (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &head_lpos,
    1140             :                                      next_lpos)) { /* LMM(data_realloc:A) */
    1141             :                 return NULL;
    1142             :         }
    1143             : 
    1144          28 :         blk = to_block(data_ring, blk_lpos->begin);
    1145             : 
    1146          14 :         if (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, next_lpos)) {
    1147           0 :                 struct prb_data_block *old_blk = blk;
    1148             : 
    1149             :                 /* Wrapping data blocks store their data at the beginning. */
    1150           0 :                 blk = to_block(data_ring, 0);
    1151             : 
    1152             :                 /*
    1153             :                  * Store the ID on the wrapped block for consistency.
    1154             :                  * The printk_ringbuffer does not actually use it.
    1155             :                  */
    1156           0 :                 blk->id = id;
    1157             : 
    1158           0 :                 if (!wrapped) {
    1159             :                         /*
    1160             :                          * Since the allocated space is now in the newly
    1161             :                          * created wrapping data block, copy the content
    1162             :                          * from the old data block.
    1163             :                          */
    1164           0 :                         memcpy(&blk->data[0], &old_blk->data[0],
    1165             :                                (blk_lpos->next - blk_lpos->begin) - sizeof(blk->id));
    1166             :                 }
    1167             :         }
    1168             : 
    1169          14 :         blk_lpos->next = next_lpos;
    1170             : 
    1171          14 :         return &blk->data[0];
    1172             : }
    1173             : 
    1174             : /* Return the number of bytes used by a data block. */
    1175         284 : static unsigned int space_used(struct prb_data_ring *data_ring,
    1176             :                                struct prb_data_blk_lpos *blk_lpos)
    1177             : {
    1178             :         /* Data-less blocks take no space. */
    1179         284 :         if (BLK_DATALESS(blk_lpos))
    1180             :                 return 0;
    1181             : 
    1182         284 :         if (DATA_WRAPS(data_ring, blk_lpos->begin) == DATA_WRAPS(data_ring, blk_lpos->next)) {
    1183             :                 /* Data block does not wrap. */
    1184         568 :                 return (DATA_INDEX(data_ring, blk_lpos->next) -
    1185         284 :                         DATA_INDEX(data_ring, blk_lpos->begin));
    1186             :         }
    1187             : 
    1188             :         /*
    1189             :          * For wrapping data blocks, the trailing (wasted) space is
    1190             :          * also counted.
    1191             :          */
    1192           0 :         return (DATA_INDEX(data_ring, blk_lpos->next) +
    1193           0 :                 DATA_SIZE(data_ring) - DATA_INDEX(data_ring, blk_lpos->begin));
    1194             : }
    1195             : 
    1196             : /*
    1197             :  * Given @blk_lpos, return a pointer to the writer data from the data block
    1198             :  * and calculate the size of the data part. A NULL pointer is returned if
    1199             :  * @blk_lpos specifies values that could never be legal.
    1200             :  *
    1201             :  * This function (used by readers) performs strict validation on the lpos
    1202             :  * values to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
    1203             :  * triggered if an internal error is detected.
    1204             :  */
    1205         442 : static const char *get_data(struct prb_data_ring *data_ring,
    1206             :                             struct prb_data_blk_lpos *blk_lpos,
    1207             :                             unsigned int *data_size)
    1208             : {
    1209             :         struct prb_data_block *db;
    1210             : 
    1211             :         /* Data-less data block description. */
    1212         442 :         if (BLK_DATALESS(blk_lpos)) {
    1213           0 :                 if (blk_lpos->begin == NO_LPOS && blk_lpos->next == NO_LPOS) {
    1214           0 :                         *data_size = 0;
    1215           0 :                         return "";
    1216             :                 }
    1217             :                 return NULL;
    1218             :         }
    1219             : 
    1220             :         /* Regular data block: @begin less than @next and in same wrap. */
    1221         442 :         if (DATA_WRAPS(data_ring, blk_lpos->begin) == DATA_WRAPS(data_ring, blk_lpos->next) &&
    1222             :             blk_lpos->begin < blk_lpos->next) {
    1223         884 :                 db = to_block(data_ring, blk_lpos->begin);
    1224         442 :                 *data_size = blk_lpos->next - blk_lpos->begin;
    1225             : 
    1226             :         /* Wrapping data block: @begin is one wrap behind @next. */
    1227           0 :         } else if (DATA_WRAPS(data_ring, blk_lpos->begin + DATA_SIZE(data_ring)) ==
    1228             :                    DATA_WRAPS(data_ring, blk_lpos->next)) {
    1229           0 :                 db = to_block(data_ring, 0);
    1230           0 :                 *data_size = DATA_INDEX(data_ring, blk_lpos->next);
    1231             : 
    1232             :         /* Illegal block description. */
    1233             :         } else {
    1234           0 :                 WARN_ON_ONCE(1);
    1235             :                 return NULL;
    1236             :         }
    1237             : 
    1238             :         /* A valid data block will always be aligned to the ID size. */
    1239         884 :         if (WARN_ON_ONCE(blk_lpos->begin != ALIGN(blk_lpos->begin, sizeof(db->id))) ||
    1240         442 :             WARN_ON_ONCE(blk_lpos->next != ALIGN(blk_lpos->next, sizeof(db->id)))) {
    1241             :                 return NULL;
    1242             :         }
    1243             : 
    1244             :         /* A valid data block will always have at least an ID. */
    1245         442 :         if (WARN_ON_ONCE(*data_size < sizeof(db->id)))
    1246             :                 return NULL;
    1247             : 
    1248             :         /* Subtract block ID space from size to reflect data size. */
    1249         442 :         *data_size -= sizeof(db->id);
    1250             : 
    1251         442 :         return &db->data[0];
    1252             : }
    1253             : 
    1254             : /*
    1255             :  * Attempt to transition the newest descriptor from committed back to reserved
    1256             :  * so that the record can be modified by a writer again. This is only possible
    1257             :  * if the descriptor is not yet finalized and the provided @caller_id matches.
    1258             :  */
    1259          22 : static struct prb_desc *desc_reopen_last(struct prb_desc_ring *desc_ring,
    1260             :                                          u32 caller_id, unsigned long *id_out)
    1261             : {
    1262             :         unsigned long prev_state_val;
    1263             :         enum desc_state d_state;
    1264             :         struct prb_desc desc;
    1265             :         struct prb_desc *d;
    1266             :         unsigned long id;
    1267             :         u32 cid;
    1268             : 
    1269          44 :         id = atomic_long_read(&desc_ring->head_id);
    1270             : 
    1271             :         /*
    1272             :          * To reduce unnecessarily reopening, first check if the descriptor
    1273             :          * state and caller ID are correct.
    1274             :          */
    1275          22 :         d_state = desc_read(desc_ring, id, &desc, NULL, &cid);
    1276          22 :         if (d_state != desc_committed || cid != caller_id)
    1277             :                 return NULL;
    1278             : 
    1279          38 :         d = to_desc(desc_ring, id);
    1280             : 
    1281          19 :         prev_state_val = DESC_SV(id, desc_committed);
    1282             : 
    1283             :         /*
    1284             :          * Guarantee the reserved state is stored before reading any
    1285             :          * record data. A full memory barrier is needed because @state_var
    1286             :          * modification is followed by reading. This pairs with _prb_commit:B.
    1287             :          *
    1288             :          * Memory barrier involvement:
    1289             :          *
    1290             :          * If desc_reopen_last:A reads from _prb_commit:B, then
    1291             :          * prb_reserve_in_last:A reads from _prb_commit:A.
    1292             :          *
    1293             :          * Relies on:
    1294             :          *
    1295             :          * WMB from _prb_commit:A to _prb_commit:B
    1296             :          *    matching
    1297             :          * MB If desc_reopen_last:A to prb_reserve_in_last:A
    1298             :          */
    1299          38 :         if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
    1300             :                         DESC_SV(id, desc_reserved))) { /* LMM(desc_reopen_last:A) */
    1301             :                 return NULL;
    1302             :         }
    1303             : 
    1304          19 :         *id_out = id;
    1305          19 :         return d;
    1306             : }
    1307             : 
    1308             : /**
    1309             :  * prb_reserve_in_last() - Re-reserve and extend the space in the ringbuffer
    1310             :  *                         used by the newest record.
    1311             :  *
    1312             :  * @e:         The entry structure to setup.
    1313             :  * @rb:        The ringbuffer to re-reserve and extend data in.
    1314             :  * @r:         The record structure to allocate buffers for.
    1315             :  * @caller_id: The caller ID of the caller (reserving writer).
    1316             :  * @max_size:  Fail if the extended size would be greater than this.
    1317             :  *
    1318             :  * This is the public function available to writers to re-reserve and extend
    1319             :  * data.
    1320             :  *
    1321             :  * The writer specifies the text size to extend (not the new total size) by
    1322             :  * setting the @text_buf_size field of @r. To ensure proper initialization
    1323             :  * of @r, prb_rec_init_wr() should be used.
    1324             :  *
    1325             :  * This function will fail if @caller_id does not match the caller ID of the
    1326             :  * newest record. In that case the caller must reserve new data using
    1327             :  * prb_reserve().
    1328             :  *
    1329             :  * Context: Any context. Disables local interrupts on success.
    1330             :  * Return: true if text data could be extended, otherwise false.
    1331             :  *
    1332             :  * On success:
    1333             :  *
    1334             :  *   - @r->text_buf points to the beginning of the entire text buffer.
    1335             :  *
    1336             :  *   - @r->text_buf_size is set to the new total size of the buffer.
    1337             :  *
    1338             :  *   - @r->info is not touched so that @r->info->text_len could be used
    1339             :  *     to append the text.
    1340             :  *
    1341             :  *   - prb_record_text_space() can be used on @e to query the new
    1342             :  *     actually used space.
    1343             :  *
    1344             :  * Important: All @r->info fields will already be set with the current values
    1345             :  *            for the record. I.e. @r->info->text_len will be less than
    1346             :  *            @text_buf_size. Writers can use @r->info->text_len to know
    1347             :  *            where concatenation begins and writers should update
    1348             :  *            @r->info->text_len after concatenating.
    1349             :  */
    1350          22 : bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
    1351             :                          struct printk_record *r, u32 caller_id, unsigned int max_size)
    1352             : {
    1353          22 :         struct prb_desc_ring *desc_ring = &rb->desc_ring;
    1354             :         struct printk_info *info;
    1355             :         unsigned int data_size;
    1356             :         struct prb_desc *d;
    1357             :         unsigned long id;
    1358             : 
    1359          22 :         local_irq_save(e->irqflags);
    1360             : 
    1361             :         /* Transition the newest descriptor back to the reserved state. */
    1362          22 :         d = desc_reopen_last(desc_ring, caller_id, &id);
    1363          22 :         if (!d) {
    1364           3 :                 local_irq_restore(e->irqflags);
    1365             :                 goto fail_reopen;
    1366             :         }
    1367             : 
    1368             :         /* Now the writer has exclusive access: LMM(prb_reserve_in_last:A) */
    1369             : 
    1370          38 :         info = to_info(desc_ring, id);
    1371             : 
    1372             :         /*
    1373             :          * Set the @e fields here so that prb_commit() can be used if
    1374             :          * anything fails from now on.
    1375             :          */
    1376          19 :         e->rb = rb;
    1377          19 :         e->id = id;
    1378             : 
    1379             :         /*
    1380             :          * desc_reopen_last() checked the caller_id, but there was no
    1381             :          * exclusive access at that point. The descriptor may have
    1382             :          * changed since then.
    1383             :          */
    1384          19 :         if (caller_id != info->caller_id)
    1385             :                 goto fail;
    1386             : 
    1387          19 :         if (BLK_DATALESS(&d->text_blk_lpos)) {
    1388           0 :                 if (WARN_ON_ONCE(info->text_len != 0)) {
    1389           0 :                         pr_warn_once("wrong text_len value (%hu, expecting 0)\n",
    1390             :                                      info->text_len);
    1391           0 :                         info->text_len = 0;
    1392             :                 }
    1393             : 
    1394           0 :                 if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
    1395             :                         goto fail;
    1396             : 
    1397           0 :                 if (r->text_buf_size > max_size)
    1398             :                         goto fail;
    1399             : 
    1400           0 :                 r->text_buf = data_alloc(rb, r->text_buf_size,
    1401             :                                          &d->text_blk_lpos, id);
    1402             :         } else {
    1403          19 :                 if (!get_data(&rb->text_data_ring, &d->text_blk_lpos, &data_size))
    1404             :                         goto fail;
    1405             : 
    1406             :                 /*
    1407             :                  * Increase the buffer size to include the original size. If
    1408             :                  * the meta data (@text_len) is not sane, use the full data
    1409             :                  * block size.
    1410             :                  */
    1411          19 :                 if (WARN_ON_ONCE(info->text_len > data_size)) {
    1412           0 :                         pr_warn_once("wrong text_len value (%hu, expecting <=%u)\n",
    1413             :                                      info->text_len, data_size);
    1414           0 :                         info->text_len = data_size;
    1415             :                 }
    1416          19 :                 r->text_buf_size += info->text_len;
    1417             : 
    1418          38 :                 if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
    1419             :                         goto fail;
    1420             : 
    1421          19 :                 if (r->text_buf_size > max_size)
    1422             :                         goto fail;
    1423             : 
    1424          19 :                 r->text_buf = data_realloc(rb, r->text_buf_size,
    1425             :                                            &d->text_blk_lpos, id);
    1426             :         }
    1427          19 :         if (r->text_buf_size && !r->text_buf)
    1428             :                 goto fail;
    1429             : 
    1430          19 :         r->info = info;
    1431             : 
    1432          19 :         e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
    1433             : 
    1434          19 :         return true;
    1435             : fail:
    1436           0 :         prb_commit(e);
    1437             :         /* prb_commit() re-enabled interrupts. */
    1438             : fail_reopen:
    1439             :         /* Make it clear to the caller that the re-reserve failed. */
    1440           6 :         memset(r, 0, sizeof(*r));
    1441           3 :         return false;
    1442             : }
    1443             : 
    1444             : /*
    1445             :  * Attempt to finalize a specified descriptor. If this fails, the descriptor
    1446             :  * is either already final or it will finalize itself when the writer commits.
    1447             :  */
    1448             : static void desc_make_final(struct prb_desc_ring *desc_ring, unsigned long id)
    1449             : {
    1450         264 :         unsigned long prev_state_val = DESC_SV(id, desc_committed);
    1451         528 :         struct prb_desc *d = to_desc(desc_ring, id);
    1452             : 
    1453         528 :         atomic_long_cmpxchg_relaxed(&d->state_var, prev_state_val,
    1454         264 :                         DESC_SV(id, desc_finalized)); /* LMM(desc_make_final:A) */
    1455             : 
    1456             :         /* Best effort to remember the last finalized @id. */
    1457         528 :         atomic_long_set(&desc_ring->last_finalized_id, id);
    1458             : }
    1459             : 
    1460             : /**
    1461             :  * prb_reserve() - Reserve space in the ringbuffer.
    1462             :  *
    1463             :  * @e:  The entry structure to setup.
    1464             :  * @rb: The ringbuffer to reserve data in.
    1465             :  * @r:  The record structure to allocate buffers for.
    1466             :  *
    1467             :  * This is the public function available to writers to reserve data.
    1468             :  *
    1469             :  * The writer specifies the text size to reserve by setting the
    1470             :  * @text_buf_size field of @r. To ensure proper initialization of @r,
    1471             :  * prb_rec_init_wr() should be used.
    1472             :  *
    1473             :  * Context: Any context. Disables local interrupts on success.
    1474             :  * Return: true if at least text data could be allocated, otherwise false.
    1475             :  *
    1476             :  * On success, the fields @info and @text_buf of @r will be set by this
    1477             :  * function and should be filled in by the writer before committing. Also
    1478             :  * on success, prb_record_text_space() can be used on @e to query the actual
    1479             :  * space used for the text data block.
    1480             :  *
    1481             :  * Important: @info->text_len needs to be set correctly by the writer in
    1482             :  *            order for data to be readable and/or extended. Its value
    1483             :  *            is initialized to 0.
    1484             :  */
    1485         265 : bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
    1486             :                  struct printk_record *r)
    1487             : {
    1488         265 :         struct prb_desc_ring *desc_ring = &rb->desc_ring;
    1489             :         struct printk_info *info;
    1490             :         struct prb_desc *d;
    1491             :         unsigned long id;
    1492             :         u64 seq;
    1493             : 
    1494         530 :         if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
    1495             :                 goto fail;
    1496             : 
    1497             :         /*
    1498             :          * Descriptors in the reserved state act as blockers to all further
    1499             :          * reservations once the desc_ring has fully wrapped. Disable
    1500             :          * interrupts during the reserve/commit window in order to minimize
    1501             :          * the likelihood of this happening.
    1502             :          */
    1503         265 :         local_irq_save(e->irqflags);
    1504             : 
    1505         265 :         if (!desc_reserve(rb, &id)) {
    1506             :                 /* Descriptor reservation failures are tracked. */
    1507           0 :                 atomic_long_inc(&rb->fail);
    1508           0 :                 local_irq_restore(e->irqflags);
    1509             :                 goto fail;
    1510             :         }
    1511             : 
    1512         530 :         d = to_desc(desc_ring, id);
    1513         530 :         info = to_info(desc_ring, id);
    1514             : 
    1515             :         /*
    1516             :          * All @info fields (except @seq) are cleared and must be filled in
    1517             :          * by the writer. Save @seq before clearing because it is used to
    1518             :          * determine the new sequence number.
    1519             :          */
    1520         265 :         seq = info->seq;
    1521         530 :         memset(info, 0, sizeof(*info));
    1522             : 
    1523             :         /*
    1524             :          * Set the @e fields here so that prb_commit() can be used if
    1525             :          * text data allocation fails.
    1526             :          */
    1527         265 :         e->rb = rb;
    1528         265 :         e->id = id;
    1529             : 
    1530             :         /*
    1531             :          * Initialize the sequence number if it has "never been set".
    1532             :          * Otherwise just increment it by a full wrap.
    1533             :          *
    1534             :          * @seq is considered "never been set" if it has a value of 0,
    1535             :          * _except_ for @infos[0], which was specially setup by the ringbuffer
    1536             :          * initializer and therefore is always considered as set.
    1537             :          *
    1538             :          * See the "Bootstrap" comment block in printk_ringbuffer.h for
    1539             :          * details about how the initializer bootstraps the descriptors.
    1540             :          */
    1541         265 :         if (seq == 0 && DESC_INDEX(desc_ring, id) != 0)
    1542         264 :                 info->seq = DESC_INDEX(desc_ring, id);
    1543             :         else
    1544           1 :                 info->seq = seq + DESCS_COUNT(desc_ring);
    1545             : 
    1546             :         /*
    1547             :          * New data is about to be reserved. Once that happens, previous
    1548             :          * descriptors are no longer able to be extended. Finalize the
    1549             :          * previous descriptor now so that it can be made available to
    1550             :          * readers. (For seq==0 there is no previous descriptor.)
    1551             :          */
    1552         265 :         if (info->seq > 0)
    1553         264 :                 desc_make_final(desc_ring, DESC_ID(id - 1));
    1554             : 
    1555         265 :         r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id);
    1556             :         /* If text data allocation fails, a data-less record is committed. */
    1557         265 :         if (r->text_buf_size && !r->text_buf) {
    1558           0 :                 prb_commit(e);
    1559             :                 /* prb_commit() re-enabled interrupts. */
    1560           0 :                 goto fail;
    1561             :         }
    1562             : 
    1563         265 :         r->info = info;
    1564             : 
    1565             :         /* Record full text space used by record. */
    1566         265 :         e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
    1567             : 
    1568         265 :         return true;
    1569             : fail:
    1570             :         /* Make it clear to the caller that the reserve failed. */
    1571           0 :         memset(r, 0, sizeof(*r));
    1572           0 :         return false;
    1573             : }
    1574             : 
    1575             : /* Commit the data (possibly finalizing it) and restore interrupts. */
    1576         284 : static void _prb_commit(struct prb_reserved_entry *e, unsigned long state_val)
    1577             : {
    1578         284 :         struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
    1579         568 :         struct prb_desc *d = to_desc(desc_ring, e->id);
    1580         284 :         unsigned long prev_state_val = DESC_SV(e->id, desc_reserved);
    1581             : 
    1582             :         /* Now the writer has finished all writing: LMM(_prb_commit:A) */
    1583             : 
    1584             :         /*
    1585             :          * Set the descriptor as committed. See "ABA Issues" about why
    1586             :          * cmpxchg() instead of set() is used.
    1587             :          *
    1588             :          * 1  Guarantee all record data is stored before the descriptor state
    1589             :          *    is stored as committed. A write memory barrier is sufficient
    1590             :          *    for this. This pairs with desc_read:B and desc_reopen_last:A.
    1591             :          *
    1592             :          * 2. Guarantee the descriptor state is stored as committed before
    1593             :          *    re-checking the head ID in order to possibly finalize this
    1594             :          *    descriptor. This pairs with desc_reserve:D.
    1595             :          *
    1596             :          *    Memory barrier involvement:
    1597             :          *
    1598             :          *    If prb_commit:A reads from desc_reserve:D, then
    1599             :          *    desc_make_final:A reads from _prb_commit:B.
    1600             :          *
    1601             :          *    Relies on:
    1602             :          *
    1603             :          *    MB _prb_commit:B to prb_commit:A
    1604             :          *       matching
    1605             :          *    MB desc_reserve:D to desc_make_final:A
    1606             :          */
    1607         568 :         if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
    1608         284 :                         DESC_SV(e->id, state_val))) { /* LMM(_prb_commit:B) */
    1609           0 :                 WARN_ON_ONCE(1);
    1610             :         }
    1611             : 
    1612             :         /* Restore interrupts, the reserve/commit window is finished. */
    1613         568 :         local_irq_restore(e->irqflags);
    1614         284 : }
    1615             : 
    1616             : /**
    1617             :  * prb_commit() - Commit (previously reserved) data to the ringbuffer.
    1618             :  *
    1619             :  * @e: The entry containing the reserved data information.
    1620             :  *
    1621             :  * This is the public function available to writers to commit data.
    1622             :  *
    1623             :  * Note that the data is not yet available to readers until it is finalized.
    1624             :  * Finalizing happens automatically when space for the next record is
    1625             :  * reserved.
    1626             :  *
    1627             :  * See prb_final_commit() for a version of this function that finalizes
    1628             :  * immediately.
    1629             :  *
    1630             :  * Context: Any context. Enables local interrupts.
    1631             :  */
    1632         153 : void prb_commit(struct prb_reserved_entry *e)
    1633             : {
    1634         153 :         struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
    1635             :         unsigned long head_id;
    1636             : 
    1637         153 :         _prb_commit(e, desc_committed);
    1638             : 
    1639             :         /*
    1640             :          * If this descriptor is no longer the head (i.e. a new record has
    1641             :          * been allocated), extending the data for this record is no longer
    1642             :          * allowed and therefore it must be finalized.
    1643             :          */
    1644         306 :         head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */
    1645         153 :         if (head_id != e->id)
    1646           0 :                 desc_make_final(desc_ring, e->id);
    1647         153 : }
    1648             : 
    1649             : /**
    1650             :  * prb_final_commit() - Commit and finalize (previously reserved) data to
    1651             :  *                      the ringbuffer.
    1652             :  *
    1653             :  * @e: The entry containing the reserved data information.
    1654             :  *
    1655             :  * This is the public function available to writers to commit+finalize data.
    1656             :  *
    1657             :  * By finalizing, the data is made immediately available to readers.
    1658             :  *
    1659             :  * This function should only be used if there are no intentions of extending
    1660             :  * this data using prb_reserve_in_last().
    1661             :  *
    1662             :  * Context: Any context. Enables local interrupts.
    1663             :  */
    1664         131 : void prb_final_commit(struct prb_reserved_entry *e)
    1665             : {
    1666         131 :         struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
    1667             : 
    1668         131 :         _prb_commit(e, desc_finalized);
    1669             : 
    1670             :         /* Best effort to remember the last finalized @id. */
    1671         262 :         atomic_long_set(&desc_ring->last_finalized_id, e->id);
    1672         131 : }
    1673             : 
    1674             : /*
    1675             :  * Count the number of lines in provided text. All text has at least 1 line
    1676             :  * (even if @text_size is 0). Each '\n' processed is counted as an additional
    1677             :  * line.
    1678             :  */
    1679           0 : static unsigned int count_lines(const char *text, unsigned int text_size)
    1680             : {
    1681           0 :         unsigned int next_size = text_size;
    1682           0 :         unsigned int line_count = 1;
    1683           0 :         const char *next = text;
    1684             : 
    1685           0 :         while (next_size) {
    1686           0 :                 next = memchr(next, '\n', next_size);
    1687           0 :                 if (!next)
    1688             :                         break;
    1689           0 :                 line_count++;
    1690           0 :                 next++;
    1691           0 :                 next_size = text_size - (next - text);
    1692             :         }
    1693             : 
    1694           0 :         return line_count;
    1695             : }
    1696             : 
    1697             : /*
    1698             :  * Given @blk_lpos, copy an expected @len of data into the provided buffer.
    1699             :  * If @line_count is provided, count the number of lines in the data.
    1700             :  *
    1701             :  * This function (used by readers) performs strict validation on the data
    1702             :  * size to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
    1703             :  * triggered if an internal error is detected.
    1704             :  */
    1705         423 : static bool copy_data(struct prb_data_ring *data_ring,
    1706             :                       struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf,
    1707             :                       unsigned int buf_size, unsigned int *line_count)
    1708             : {
    1709             :         unsigned int data_size;
    1710             :         const char *data;
    1711             : 
    1712             :         /* Caller might not want any data. */
    1713         423 :         if ((!buf || !buf_size) && !line_count)
    1714             :                 return true;
    1715             : 
    1716         423 :         data = get_data(data_ring, blk_lpos, &data_size);
    1717         423 :         if (!data)
    1718             :                 return false;
    1719             : 
    1720             :         /*
    1721             :          * Actual cannot be less than expected. It can be more than expected
    1722             :          * because of the trailing alignment padding.
    1723             :          *
    1724             :          * Note that invalid @len values can occur because the caller loads
    1725             :          * the value during an allowed data race.
    1726             :          */
    1727         423 :         if (data_size < (unsigned int)len)
    1728             :                 return false;
    1729             : 
    1730             :         /* Caller interested in the line count? */
    1731         423 :         if (line_count)
    1732           0 :                 *line_count = count_lines(data, len);
    1733             : 
    1734             :         /* Caller interested in the data content? */
    1735         423 :         if (!buf || !buf_size)
    1736             :                 return true;
    1737             : 
    1738         423 :         data_size = min_t(u16, buf_size, len);
    1739             : 
    1740         846 :         memcpy(&buf[0], data, data_size); /* LMM(copy_data:A) */
    1741         423 :         return true;
    1742             : }
    1743             : 
    1744             : /*
    1745             :  * This is an extended version of desc_read(). It gets a copy of a specified
    1746             :  * descriptor. However, it also verifies that the record is finalized and has
    1747             :  * the sequence number @seq. On success, 0 is returned.
    1748             :  *
    1749             :  * Error return values:
    1750             :  * -EINVAL: A finalized record with sequence number @seq does not exist.
    1751             :  * -ENOENT: A finalized record with sequence number @seq exists, but its data
    1752             :  *          is not available. This is a valid record, so readers should
    1753             :  *          continue with the next record.
    1754             :  */
    1755        1323 : static int desc_read_finalized_seq(struct prb_desc_ring *desc_ring,
    1756             :                                    unsigned long id, u64 seq,
    1757             :                                    struct prb_desc *desc_out)
    1758             : {
    1759        1323 :         struct prb_data_blk_lpos *blk_lpos = &desc_out->text_blk_lpos;
    1760             :         enum desc_state d_state;
    1761             :         u64 s;
    1762             : 
    1763        1323 :         d_state = desc_read(desc_ring, id, desc_out, &s, NULL);
    1764             : 
    1765             :         /*
    1766             :          * An unexpected @id (desc_miss) or @seq mismatch means the record
    1767             :          * does not exist. A descriptor in the reserved or committed state
    1768             :          * means the record does not yet exist for the reader.
    1769             :          */
    1770        1323 :         if (d_state == desc_miss ||
    1771        1245 :             d_state == desc_reserved ||
    1772         846 :             d_state == desc_committed ||
    1773         846 :             s != seq) {
    1774             :                 return -EINVAL;
    1775             :         }
    1776             : 
    1777             :         /*
    1778             :          * A descriptor in the reusable state may no longer have its data
    1779             :          * available; report it as existing but with lost data. Or the record
    1780             :          * may actually be a record with lost data.
    1781             :          */
    1782        1692 :         if (d_state == desc_reusable ||
    1783         846 :             (blk_lpos->begin == FAILED_LPOS && blk_lpos->next == FAILED_LPOS)) {
    1784             :                 return -ENOENT;
    1785             :         }
    1786             : 
    1787         846 :         return 0;
    1788             : }
    1789             : 
    1790             : /*
    1791             :  * Copy the ringbuffer data from the record with @seq to the provided
    1792             :  * @r buffer. On success, 0 is returned.
    1793             :  *
    1794             :  * See desc_read_finalized_seq() for error return values.
    1795             :  */
    1796         900 : static int prb_read(struct printk_ringbuffer *rb, u64 seq,
    1797             :                     struct printk_record *r, unsigned int *line_count)
    1798             : {
    1799         900 :         struct prb_desc_ring *desc_ring = &rb->desc_ring;
    1800        1800 :         struct printk_info *info = to_info(desc_ring, seq);
    1801        1800 :         struct prb_desc *rdesc = to_desc(desc_ring, seq);
    1802         900 :         atomic_long_t *state_var = &rdesc->state_var;
    1803             :         struct prb_desc desc;
    1804             :         unsigned long id;
    1805             :         int err;
    1806             : 
    1807             :         /* Extract the ID, used to specify the descriptor to read. */
    1808         900 :         id = DESC_ID(atomic_long_read(state_var));
    1809             : 
    1810             :         /* Get a local copy of the correct descriptor (if available). */
    1811         900 :         err = desc_read_finalized_seq(desc_ring, id, seq, &desc);
    1812             : 
    1813             :         /*
    1814             :          * If @r is NULL, the caller is only interested in the availability
    1815             :          * of the record.
    1816             :          */
    1817         900 :         if (err || !r)
    1818             :                 return err;
    1819             : 
    1820             :         /* If requested, copy meta data. */
    1821         423 :         if (r->info)
    1822         846 :                 memcpy(r->info, info, sizeof(*(r->info)));
    1823             : 
    1824             :         /* Copy text data. If it fails, this is a data-less record. */
    1825         423 :         if (!copy_data(&rb->text_data_ring, &desc.text_blk_lpos, info->text_len,
    1826             :                        r->text_buf, r->text_buf_size, line_count)) {
    1827             :                 return -ENOENT;
    1828             :         }
    1829             : 
    1830             :         /* Ensure the record is still finalized and has the same @seq. */
    1831         423 :         return desc_read_finalized_seq(desc_ring, id, seq, &desc);
    1832             : }
    1833             : 
    1834             : /* Get the sequence number of the tail descriptor. */
    1835         477 : static u64 prb_first_seq(struct printk_ringbuffer *rb)
    1836             : {
    1837         477 :         struct prb_desc_ring *desc_ring = &rb->desc_ring;
    1838             :         enum desc_state d_state;
    1839             :         struct prb_desc desc;
    1840             :         unsigned long id;
    1841             :         u64 seq;
    1842             : 
    1843             :         for (;;) {
    1844         954 :                 id = atomic_long_read(&rb->desc_ring.tail_id); /* LMM(prb_first_seq:A) */
    1845             : 
    1846         477 :                 d_state = desc_read(desc_ring, id, &desc, &seq, NULL); /* LMM(prb_first_seq:B) */
    1847             : 
    1848             :                 /*
    1849             :                  * This loop will not be infinite because the tail is
    1850             :                  * _always_ in the finalized or reusable state.
    1851             :                  */
    1852         477 :                 if (d_state == desc_finalized || d_state == desc_reusable)
    1853             :                         break;
    1854             : 
    1855             :                 /*
    1856             :                  * Guarantee the last state load from desc_read() is before
    1857             :                  * reloading @tail_id in order to see a new tail in the case
    1858             :                  * that the descriptor has been recycled. This pairs with
    1859             :                  * desc_reserve:D.
    1860             :                  *
    1861             :                  * Memory barrier involvement:
    1862             :                  *
    1863             :                  * If prb_first_seq:B reads from desc_reserve:F, then
    1864             :                  * prb_first_seq:A reads from desc_push_tail:B.
    1865             :                  *
    1866             :                  * Relies on:
    1867             :                  *
    1868             :                  * MB from desc_push_tail:B to desc_reserve:F
    1869             :                  *    matching
    1870             :                  * RMB prb_first_seq:B to prb_first_seq:A
    1871             :                  */
    1872           0 :                 smp_rmb(); /* LMM(prb_first_seq:C) */
    1873             :         }
    1874             : 
    1875         477 :         return seq;
    1876             : }
    1877             : 
    1878             : /*
    1879             :  * Non-blocking read of a record. Updates @seq to the last finalized record
    1880             :  * (which may have no data available).
    1881             :  *
    1882             :  * See the description of prb_read_valid() and prb_read_valid_info()
    1883             :  * for details.
    1884             :  */
    1885         900 : static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
    1886             :                             struct printk_record *r, unsigned int *line_count)
    1887             : {
    1888             :         u64 tail_seq;
    1889             :         int err;
    1890             : 
    1891        1800 :         while ((err = prb_read(rb, *seq, r, line_count))) {
    1892         477 :                 tail_seq = prb_first_seq(rb);
    1893             : 
    1894         477 :                 if (*seq < tail_seq) {
    1895             :                         /*
    1896             :                          * Behind the tail. Catch up and try again. This
    1897             :                          * can happen for -ENOENT and -EINVAL cases.
    1898             :                          */
    1899           0 :                         *seq = tail_seq;
    1900             : 
    1901         477 :                 } else if (err == -ENOENT) {
    1902             :                         /* Record exists, but no data available. Skip. */
    1903           0 :                         (*seq)++;
    1904             : 
    1905             :                 } else {
    1906             :                         /* Non-existent/non-finalized record. Must stop. */
    1907             :                         return false;
    1908             :                 }
    1909             :         }
    1910             : 
    1911             :         return true;
    1912             : }
    1913             : 
    1914             : /**
    1915             :  * prb_read_valid() - Non-blocking read of a requested record or (if gone)
    1916             :  *                    the next available record.
    1917             :  *
    1918             :  * @rb:  The ringbuffer to read from.
    1919             :  * @seq: The sequence number of the record to read.
    1920             :  * @r:   A record data buffer to store the read record to.
    1921             :  *
    1922             :  * This is the public function available to readers to read a record.
    1923             :  *
    1924             :  * The reader provides the @info and @text_buf buffers of @r to be
    1925             :  * filled in. Any of the buffer pointers can be set to NULL if the reader
    1926             :  * is not interested in that data. To ensure proper initialization of @r,
    1927             :  * prb_rec_init_rd() should be used.
    1928             :  *
    1929             :  * Context: Any context.
    1930             :  * Return: true if a record was read, otherwise false.
    1931             :  *
    1932             :  * On success, the reader must check r->info.seq to see which record was
    1933             :  * actually read. This allows the reader to detect dropped records.
    1934             :  *
    1935             :  * Failure means @seq refers to a not yet written record.
    1936             :  */
    1937         899 : bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
    1938             :                     struct printk_record *r)
    1939             : {
    1940         899 :         return _prb_read_valid(rb, &seq, r, NULL);
    1941             : }
    1942             : 
    1943             : /**
    1944             :  * prb_read_valid_info() - Non-blocking read of meta data for a requested
    1945             :  *                         record or (if gone) the next available record.
    1946             :  *
    1947             :  * @rb:         The ringbuffer to read from.
    1948             :  * @seq:        The sequence number of the record to read.
    1949             :  * @info:       A buffer to store the read record meta data to.
    1950             :  * @line_count: A buffer to store the number of lines in the record text.
    1951             :  *
    1952             :  * This is the public function available to readers to read only the
    1953             :  * meta data of a record.
    1954             :  *
    1955             :  * The reader provides the @info, @line_count buffers to be filled in.
    1956             :  * Either of the buffer pointers can be set to NULL if the reader is not
    1957             :  * interested in that data.
    1958             :  *
    1959             :  * Context: Any context.
    1960             :  * Return: true if a record's meta data was read, otherwise false.
    1961             :  *
    1962             :  * On success, the reader must check info->seq to see which record meta data
    1963             :  * was actually read. This allows the reader to detect dropped records.
    1964             :  *
    1965             :  * Failure means @seq refers to a not yet written record.
    1966             :  */
    1967           0 : bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
    1968             :                          struct printk_info *info, unsigned int *line_count)
    1969             : {
    1970             :         struct printk_record r;
    1971             : 
    1972           0 :         prb_rec_init_rd(&r, info, NULL, 0);
    1973             : 
    1974           0 :         return _prb_read_valid(rb, &seq, &r, line_count);
    1975             : }
    1976             : 
    1977             : /**
    1978             :  * prb_first_valid_seq() - Get the sequence number of the oldest available
    1979             :  *                         record.
    1980             :  *
    1981             :  * @rb: The ringbuffer to get the sequence number from.
    1982             :  *
    1983             :  * This is the public function available to readers to see what the
    1984             :  * first/oldest valid sequence number is.
    1985             :  *
    1986             :  * This provides readers a starting point to begin iterating the ringbuffer.
    1987             :  *
    1988             :  * Context: Any context.
    1989             :  * Return: The sequence number of the first/oldest record or, if the
    1990             :  *         ringbuffer is empty, 0 is returned.
    1991             :  */
    1992           0 : u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
    1993             : {
    1994           0 :         u64 seq = 0;
    1995             : 
    1996           0 :         if (!_prb_read_valid(rb, &seq, NULL, NULL))
    1997             :                 return 0;
    1998             : 
    1999           0 :         return seq;
    2000             : }
    2001             : 
    2002             : /**
    2003             :  * prb_next_seq() - Get the sequence number after the last available record.
    2004             :  *
    2005             :  * @rb:  The ringbuffer to get the sequence number from.
    2006             :  *
    2007             :  * This is the public function available to readers to see what the next
    2008             :  * newest sequence number available to readers will be.
    2009             :  *
    2010             :  * This provides readers a sequence number to jump to if all currently
    2011             :  * available records should be skipped.
    2012             :  *
    2013             :  * Context: Any context.
    2014             :  * Return: The sequence number of the next newest (not yet available) record
    2015             :  *         for readers.
    2016             :  */
    2017           1 : u64 prb_next_seq(struct printk_ringbuffer *rb)
    2018             : {
    2019           1 :         struct prb_desc_ring *desc_ring = &rb->desc_ring;
    2020             :         enum desc_state d_state;
    2021             :         unsigned long id;
    2022             :         u64 seq;
    2023             : 
    2024             :         /* Check if the cached @id still points to a valid @seq. */
    2025           2 :         id = atomic_long_read(&desc_ring->last_finalized_id);
    2026           1 :         d_state = desc_read(desc_ring, id, NULL, &seq, NULL);
    2027             : 
    2028           1 :         if (d_state == desc_finalized || d_state == desc_reusable) {
    2029             :                 /*
    2030             :                  * Begin searching after the last finalized record.
    2031             :                  *
    2032             :                  * On 0, the search must begin at 0 because of hack#2
    2033             :                  * of the bootstrapping phase it is not known if a
    2034             :                  * record at index 0 exists.
    2035             :                  */
    2036           1 :                 if (seq != 0)
    2037           1 :                         seq++;
    2038             :         } else {
    2039             :                 /*
    2040             :                  * The information about the last finalized sequence number
    2041             :                  * has gone. It should happen only when there is a flood of
    2042             :                  * new messages and the ringbuffer is rapidly recycled.
    2043             :                  * Give up and start from the beginning.
    2044             :                  */
    2045           0 :                 seq = 0;
    2046             :         }
    2047             : 
    2048             :         /*
    2049             :          * The information about the last finalized @seq might be inaccurate.
    2050             :          * Search forward to find the current one.
    2051             :          */
    2052           1 :         while (_prb_read_valid(rb, &seq, NULL, NULL))
    2053           0 :                 seq++;
    2054             : 
    2055           1 :         return seq;
    2056             : }
    2057             : 
    2058             : /**
    2059             :  * prb_init() - Initialize a ringbuffer to use provided external buffers.
    2060             :  *
    2061             :  * @rb:       The ringbuffer to initialize.
    2062             :  * @text_buf: The data buffer for text data.
    2063             :  * @textbits: The size of @text_buf as a power-of-2 value.
    2064             :  * @descs:    The descriptor buffer for ringbuffer records.
    2065             :  * @descbits: The count of @descs items as a power-of-2 value.
    2066             :  * @infos:    The printk_info buffer for ringbuffer records.
    2067             :  *
    2068             :  * This is the public function available to writers to setup a ringbuffer
    2069             :  * during runtime using provided buffers.
    2070             :  *
    2071             :  * This must match the initialization of DEFINE_PRINTKRB().
    2072             :  *
    2073             :  * Context: Any context.
    2074             :  */
    2075           0 : void prb_init(struct printk_ringbuffer *rb,
    2076             :               char *text_buf, unsigned int textbits,
    2077             :               struct prb_desc *descs, unsigned int descbits,
    2078             :               struct printk_info *infos)
    2079             : {
    2080           0 :         memset(descs, 0, _DESCS_COUNT(descbits) * sizeof(descs[0]));
    2081           0 :         memset(infos, 0, _DESCS_COUNT(descbits) * sizeof(infos[0]));
    2082             : 
    2083           0 :         rb->desc_ring.count_bits = descbits;
    2084           0 :         rb->desc_ring.descs = descs;
    2085           0 :         rb->desc_ring.infos = infos;
    2086           0 :         atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits));
    2087           0 :         atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits));
    2088           0 :         atomic_long_set(&rb->desc_ring.last_finalized_id, DESC0_ID(descbits));
    2089             : 
    2090           0 :         rb->text_data_ring.size_bits = textbits;
    2091           0 :         rb->text_data_ring.data = text_buf;
    2092           0 :         atomic_long_set(&rb->text_data_ring.head_lpos, BLK0_LPOS(textbits));
    2093           0 :         atomic_long_set(&rb->text_data_ring.tail_lpos, BLK0_LPOS(textbits));
    2094             : 
    2095           0 :         atomic_long_set(&rb->fail, 0);
    2096             : 
    2097           0 :         atomic_long_set(&(descs[_DESCS_COUNT(descbits) - 1].state_var), DESC0_SV(descbits));
    2098           0 :         descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.begin = FAILED_LPOS;
    2099           0 :         descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.next = FAILED_LPOS;
    2100             : 
    2101           0 :         infos[0].seq = -(u64)_DESCS_COUNT(descbits);
    2102           0 :         infos[_DESCS_COUNT(descbits) - 1].seq = 0;
    2103           0 : }
    2104             : 
    2105             : /**
    2106             :  * prb_record_text_space() - Query the full actual used ringbuffer space for
    2107             :  *                           the text data of a reserved entry.
    2108             :  *
    2109             :  * @e: The successfully reserved entry to query.
    2110             :  *
    2111             :  * This is the public function available to writers to see how much actual
    2112             :  * space is used in the ringbuffer to store the text data of the specified
    2113             :  * entry.
    2114             :  *
    2115             :  * This function is only valid if @e has been successfully reserved using
    2116             :  * prb_reserve().
    2117             :  *
    2118             :  * Context: Any context.
    2119             :  * Return: The size in bytes used by the text data of the associated record.
    2120             :  */
    2121           0 : unsigned int prb_record_text_space(struct prb_reserved_entry *e)
    2122             : {
    2123           0 :         return e->text_space;
    2124             : }

Generated by: LCOV version 1.14